src/server/daemon/event_linux.c

changeset 193
aa8393527b1e
parent 162
b169992137a8
parent 192
6a145e13d933
child 443
ef3c8a0e1fee
--- a/src/server/daemon/event_linux.c	Thu Aug 31 16:29:49 2017 +0200
+++ b/src/server/daemon/event_linux.c	Sat Jan 13 19:01:00 2018 +0100
@@ -30,65 +30,61 @@
 #include <stdlib.h>
 #include <errno.h>
 #include <sys/epoll.h>
+#include <sys/eventfd.h>
 
 #include "../util/systhr.h"
 #include "../util/atomic.h"
 
+#include "../util/io.h"
 
 #include "event.h"
 #include "event_linux.h"
 
 
-event_handler_t* evhandler_create(int numthreads) {
-    event_handler_t *ev = malloc(sizeof(event_handler_t));
-    if(ev == NULL) {
-        return NULL;
-    }
+EVHandler* evhandler_create(EventHandlerConfig *cfg) {
+    EVHandler *ev = malloc(sizeof(EVHandler));
+    ev->current = 0;
+    ev->instances = calloc(cfg->nthreads, sizeof(void*));
+    ev->numins = cfg->nthreads;
     
-    ev->ep = calloc(numthreads, sizeof(int));
-    if(ev->ep == NULL) {
-        free(ev);
-        return NULL;
-    }
-    ev->nep = numthreads;
-    ev->lep = 0;
-    
-    /* create ports event threads */
-    for(int i=0;i<numthreads;i++) {
-        /* create port */
-        ev->ep[i] = epoll_create(64);
-        if(ev->ep[i] == 0) {
-            free(ev->ep);
-            free(ev);
+    for(int i=0;i<cfg->nthreads;i++) {
+        EventHandler *handler = malloc(sizeof(EventHandler));
+        ev->instances[i] = handler;
+        
+        handler->ep = epoll_create(64);
+        if(handler->ep == 0) {
+            // TODO: error
             return NULL;
         }
         
-        /*
-         * start a new handler thread
-         * the thread needs the event port and a pointer to the event handler
-         */
-        ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t));
-        if(conf == NULL) {
-            free(ev->ep);
-            free(ev);
+        int eventpipe[2];
+        if(pipe(eventpipe)) {
             return NULL;
         }
-        conf->handler = ev;
-        conf->ep = ev->ep[i];
+        handler->eventin = eventpipe[0];
+        handler->eventout = eventpipe[1];
         
-        systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf);
-        /* TODO: error handling */
+        struct epoll_event epev;
+        epev.events = EPOLLIN | EPOLLET; // input event, edge triggered
+        epev.data.ptr = NULL;
+        if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->eventin, &epev)) {
+            return NULL;
+        }
+        
+        SYS_THREAD t = systhread_start(
+                0,
+                0,
+                (thrstartfunc)ev_handle_events,
+                handler);
+        systhread_detach(t);
     }
     
     return ev;
 }
 
 
-void ev_handle_events(ev_thr_conf_t *conf) {
-    event_handler_t *ev = conf->handler;
-    int ep = conf->ep;
-    
-    free(conf);
+void ev_handle_events(EventHandler *ev) {
+    int ep = ev->ep;
     
     //port_event_t events[16];
     struct epoll_event events[16];
@@ -102,9 +98,29 @@
         }
         
         for(int i=0;i<ret;i++) {
-            event_t *event = events[i].data.ptr;
-            if(event->fn) {
-                int saved_ev = event->poll;
+            Event *event = events[i].data.ptr;
+            if(!event) {
+                char ebuf[sizeof(Event*)];
+                int ebufpos = 0;
+                char *b = ebuf;
+                while(ebufpos < sizeof(Event*)) {
+                    ssize_t r = read(ev->eventin, b + ebufpos, sizeof(Event*)-ebufpos);
+                    if(r < 0) {
+                        break;
+                    }
+                    ebufpos += r;
+                }
+                if(ebufpos == sizeof(Event*)) {
+                    intptr_t *p = (intptr_t*)b;
+                    *(&event) = (Event*)*p;
+                    if(event->fn) {
+                        if(!event->fn(ev, event) && event->finish) {
+                            event->finish(ev, event);
+                        }
+                    }
+                }              
+            } else if(event->fn) {
+                int saved_ev = event->events;
                 if(!event->fn(ev, event)) {
                     // event fn returned 0 -> remove event from epoll
                     if(epoll_ctl(ep, EPOLL_CTL_DEL, event->object, NULL)) {
@@ -121,21 +137,16 @@
                         event->finish(ev, event);
                     }
                 } else {
-                    if(saved_ev != event->poll) {
+                    if(saved_ev != event->events) {
                         // event type changed
                         struct epoll_event epev;
                         epev.events = EPOLLET;
                         epev.data.ptr = event;
                         
                         // adjust epoll events
-                        if((event->poll & EVENT_POLLIN) == EVENT_POLLIN) {
-                            epev.events |= EPOLLIN;
-                        }
-                        if((event->poll & EVENT_POLLOUT) == EVENT_POLLOUT) {
-                            epev.events |= EPOLLOUT;
-                        }
+                        epev.events = ev_convert2sys_events(event->events);
                         
-                        if(epoll_ctl(ep, EPOLL_CTL_MOD, event->object, NULL)) {
+                        if(epoll_ctl(ep, EPOLL_CTL_MOD, event->object, &epev)) {
                             log_ereport(
                                     LOG_FAILURE,
                                     "epoll_wait failed: %s",
@@ -148,40 +159,77 @@
     }
 }
 
-/* returns a event handler port */
-int ev_get_port(event_handler_t *h) {
-    int nps = h->nep;
-    if(nps == 1) {
-        return h->ep[0];
+int ev_convert2sys_events(int events) {
+    int e = EPOLLET;
+    if((events & EVENT_POLLIN) == EVENT_POLLIN) {
+        e |= EPOLLIN;
     }
-    
-    int cp = h->lep % nps;
-    ws_atomic_inc32(&h->lep);
-    
-    return h->ep[cp];
+    if((events & EVENT_POLLOUT) == EVENT_POLLOUT) {
+        e |= EPOLLOUT;
+    }
+    return e;
 }
 
-int ev_pollin(event_handler_t *h, int fd, event_t *event) {
+int ev_pollin(EventHandler *h, int fd, Event *event) {
     event->object = (intptr_t)fd;
-    event->poll = EVENT_POLLIN;
+    event->events = EVENT_POLLIN;
     struct epoll_event epev;
     epev.events = EPOLLIN | EPOLLET; // input event, edge triggered
     epev.data.ptr = event;
-    return epoll_ctl(ev_get_port(h), EPOLL_CTL_ADD, fd, &epev);
+    return epoll_ctl(h->ep, EPOLL_CTL_ADD, fd, &epev);
 }
 
-int ev_pollout(event_handler_t *h, int fd, event_t *event) {
+int ev_pollout(EventHandler *h, int fd, Event *event) {
     event->object = (intptr_t)fd;
-    event->poll = EVENT_POLLOUT;
+    event->events = EVENT_POLLOUT;
     struct epoll_event epev;
     epev.events = EPOLLOUT | EPOLLET; // input event, edge triggered
     epev.data.ptr = event;
-    return epoll_ctl(ev_get_port(h), EPOLL_CTL_ADD, fd, &epev);
+    return epoll_ctl(h->ep, EPOLL_CTL_ADD, fd, &epev);
+}
+
+int ev_remove_poll(EventHandler *h, int fd) {
+    return epoll_ctl(h->ep, EPOLL_CTL_DEL, fd, NULL);
+}
+
+int event_send(EventHandler *h, Event *event) {
+    event->object = 0;
+    event->events = 0;
+    ssize_t r = write(h->eventout, &event, sizeof(Event*));
+    if(r < sizeof(Event*)) {
+        log_ereport(LOG_FAILURE, "failed to send event: %s", strerror(errno));
+    }
+    return r > 0 ? 0 : 1;
 }
 
-int evt_send(event_handler_t *h, event_t *event) {
-    event->object = 0;
-    // TODO: implement using threadpool or eventfd
-    fprintf(stderr, "Warning: evt_send not implemented\n");
-    return 0;
+// TODO: remove this fake aio
+int ev_aioread(int fd, aiocb_s *cb) {
+    ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset);
+    cb->result = result;
+    if(result < 0) {
+        cb->result_errno = errno;
+    }
+    return event_send(cb->evhandler, cb->event);
 }
+
+int ev_aiowrite(int fd, aiocb_s *cb) {
+    ssize_t result = pwrite(fd, cb->buf, cb->nbytes, cb->offset);
+    cb->result = result;
+    if(result < 0) {
+        cb->result_errno = errno;
+    }
+    return event_send(cb->evhandler, cb->event);
+}
+
+
+int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) {
+    return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event);
+}
+
+int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) {
+    return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event);
+}
+
+int event_removepoll(EventHandler *ev, SYS_NETFD fd) {
+    return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL);
+}

mercurial