refactors kqueue event handler aio

Sat, 04 Feb 2017 16:42:11 +0100

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Sat, 04 Feb 2017 16:42:11 +0100
branch
aio
changeset 170
711d00eeed25
parent 165
6942a8c3e737
child 172
5580517faafc

refactors kqueue event handler

src/server/daemon/event_bsd.c file | annotate | diff | comparison | revisions
src/server/daemon/event_bsd.h file | annotate | diff | comparison | revisions
--- a/src/server/daemon/event_bsd.c	Sat Jan 28 19:42:22 2017 +0100
+++ b/src/server/daemon/event_bsd.c	Sat Feb 04 16:42:11 2017 +0100
@@ -33,81 +33,63 @@
 
 #include "event_bsd.h"
 
-event_handler_t* evhandler_create(int numthreads) {
-    event_handler_t *ev = malloc(sizeof(event_handler_t));
-    if(ev == NULL) {
-        return NULL;
-    }
+EVHandler* evhandler_create(EventHandlerConfig *cfg) {
+    EVHandler *ev = malloc(sizeof(EVHandler));
+    ev->current = 0;
+    ev->instances = calloc(cfg->nthreads, sizeof(void*));
+    ev->numins = cfg->nthreads;
     
-    ev->ports = calloc(numthreads, sizeof(int));
-    if(ev->ports == NULL) {
-        free(ev);
-        return NULL;
-    }
-    ev->nports = numthreads;
-    ev->lp = 0;
-    
-    /* create ports event threads */
-    for(int i=0;i<numthreads;i++) {
-        /* create port */
-        //ev->ports[i] = port_create();
-        ev->ports[i] = kqueue();
-        if(ev->ports[i] == 0) {
-            free(ev->ports);
-            free(ev);
+    for(int i=0;i<cfg->nthreads;i++) {
+        EventHandler *handler = malloc(sizeof(EventHandler));
+        ev->instances[i] = handler;
+        
+        handler->kqueue = kqueue();
+        if(handler->kqueue == 0) {
+            // TODO: error
             return NULL;
         }
         
-        /*
-         * start a new handler thread
-         * the thread needs the event port and a pointer to the event handler
-         */
-        ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t));
-        if(conf == NULL) {
-            free(ev->ports);
-            free(ev);
-            return NULL;
-        }
-        conf->handler = ev;
-        conf->port = ev->ports[i];
-        
-        systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf);
-        /* TODO: error handling */
+        SYS_THREAD t = systhread_start(
+                0,
+                0,
+                (thrstartfunc)ev_handle_events,
+                handler);
+        systhread_detach(t);
     }
     
     return ev;
 }
 
-void ev_handle_events(ev_thr_conf_t *conf) {
-    event_handler_t *ev = conf->handler;
-    int kq = conf->port;
-    
-    free(conf);
-    
+
+void ev_handle_events(EventHandler *ev) {
     struct timespec timeout;
     timeout.tv_nsec = 0;
     timeout.tv_sec = 600;
     
-    struct kevent events[16];
+    struct kevent events[64];
+    struct kevent changes[64];
+    int numchanges = 0;
     
     for(;;) {
         // wait for events
-        int nev;
-        nev = kevent(kq, NULL, 0, events, 16, &timeout);
+        int nev = kevent(ev->kqueue, changes, numchanges, events, 64, &timeout);
         if(nev == -1) {
             // TODO: check for error
             perror("kevent");
             continue;
         }
         
+        numchanges = 0;
         for(int i=0;i<nev;i++) {
-            event_t *event = (event_t*)events[i].udata;
+            Event *event = (Event*)events[i].udata;
             if(event->fn) {
-                int ep = event->poll;
+                int ep = event->events;
                 if(event->fn(ev, event)) {
-                    // TODO: reassociate?   
-                    // TODO: check ep and event->poll
+                    if(event->events != ep) {
+                        changes[numchanges++].filter = ev_convert2sys_events(ep);
+                    }
                 } else if(event->finish) {
+                    changes[numchanges++].filter = ev_convert2sys_events(ep);
                     event->finish(ev, event);
                 }
             }
@@ -115,33 +97,31 @@
     }
 }
 
-/* returns a event handler port */
-int ev_get_port(event_handler_t *h) {
-    int nps = h->nports;
-    if(nps == 1) {
-        return h->ports[0];
+int ev_convert2sys_events(int events) {
+    int e = 0;
+    if((events & EVENT_POLLIN) == EVENT_POLLIN) {
+        e |= EVFILT_READ;
     }
-    
-    int cp = h->lp % nps;
-    ws_atomic_inc32(&h->lp);
-    
-    return h->ports[cp];
+    if((events & EVENT_POLLOUT) == EVENT_POLLOUT) {
+        e |= EVFILT_WRITE;
+    }
+    return e;
 }
 
-int ev_pollin(event_handler_t *h, int fd, event_t *event) {
-    event->poll = EVENT_POLLIN;
+int ev_pollin(EventHandler *h, int fd, Event *event) {
+    event->events = EVENT_POLLIN;
     struct kevent kev;
     EV_SET(&kev, fd, EVFILT_READ, EV_ADD, 0, 0, event);
-    return kevent(ev_get_port(h), &kev, 1, NULL, 0, NULL);
+    return kevent(h->kqueue, &kev, 1, NULL, 0, NULL);
 }
 
-int ev_pollout(event_handler_t *h, int fd, event_t *event) {
-    event->poll = EVENT_POLLOUT;
+int ev_pollout(EventHandler *h, int fd, Event *event) {
+    event->events = EVENT_POLLOUT;
     struct kevent kev;
     EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD, 0, 0, event);
-    return kevent(ev_get_port(h), &kev, 1, NULL, 0, NULL);
+    return kevent(h->kqueue, &kev, 1, NULL, 0, NULL);
 }
 
-int evt_send(event_handler_t *h, event_t *event) {
+int evt_send(EventHandler *h, Event *event) {
     return 0;
 }
--- a/src/server/daemon/event_bsd.h	Sat Jan 28 19:42:22 2017 +0100
+++ b/src/server/daemon/event_bsd.h	Sat Feb 04 16:42:11 2017 +0100
@@ -39,20 +39,13 @@
 extern "C" {
 #endif
 
-struct event_handler {
-    int       *ports;
-    uint32_t  nports;
-    uint32_t  lp;
+struct EventHandler {
+    int kqueue;
 };
 
-typedef struct ev_thr_conf {
-    event_handler_t *handler;
-    int             port;
-} ev_thr_conf_t;
+void ev_handle_events(EventHandler *ev);
 
-void ev_handle_events(ev_thr_conf_t *conf);
-
-int ev_get_port(event_handler_t *h);
+int ev_convert2sys_events(int events);
 
 #ifdef	__cplusplus
 }

mercurial