makes EventHandler public aio

Wed, 25 Jan 2017 19:19:47 +0100

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Wed, 25 Jan 2017 19:19:47 +0100
branch
aio
changeset 159
9ba9f8befa80
parent 157
a0c8e752490d
child 165
6942a8c3e737

makes EventHandler public

src/server/daemon/event.c file | annotate | diff | comparison | revisions
src/server/daemon/event.h file | annotate | diff | comparison | revisions
src/server/daemon/event_solaris.c file | annotate | diff | comparison | revisions
src/server/daemon/event_solaris.h file | annotate | diff | comparison | revisions
src/server/daemon/httprequest.c file | annotate | diff | comparison | revisions
src/server/daemon/httprequest.h file | annotate | diff | comparison | revisions
src/server/daemon/sessionhandler.c file | annotate | diff | comparison | revisions
src/server/daemon/sessionhandler.h file | annotate | diff | comparison | revisions
src/server/public/nsapi.h file | annotate | diff | comparison | revisions
src/server/util/system.c file | annotate | diff | comparison | revisions
--- a/src/server/daemon/event.c	Tue Jan 24 23:19:48 2017 +0100
+++ b/src/server/daemon/event.c	Wed Jan 25 19:19:47 2017 +0100
@@ -26,16 +26,17 @@
  * POSSIBILITY OF SUCH DAMAGE.
  */
 
-#include <ucx/map.h>
+#include "../../ucx/map.h"
+#include "../util/atomic.h"
 
 #include "event.h"
 
 UcxMap *event_handler_map = NULL;
 int numevhandlers = 0;
 
-event_handler_t *default_event_handler = NULL;
+EVHandler *default_event_handler = NULL;
 
-event_handler_t *last_handler_c = NULL;
+EVHandler *last_handler_c = NULL;
 
 int create_event_handler(EventHandlerConfig *cfg) {
     if(event_handler_map == NULL) {
@@ -50,7 +51,7 @@
     }
     
     /* create new handler */
-    event_handler_t *e = evhandler_create(cfg->nthreads);
+    EVHandler *e = evhandler_create(cfg);
     if(e == NULL) {
         return 1;
     }
@@ -95,10 +96,21 @@
 }
 
 
-event_handler_t* get_default_event_handler() {
+EVHandler* get_default_event_handler() {
     return default_event_handler;
 }
 
-event_handler_t* get_event_handler(char *name) {
+EVHandler* get_event_handler(char *name) {
     return ucx_map_cstr_get(event_handler_map, name);
 }
+
+EventHandler* ev_instance(EVHandler *ev) {
+    int nev = ev->numins;
+    if(nev == 1) {
+        return ev->instances[0];
+    }
+    
+    int ins = ev->current & nev;
+    ws_atomic_inc32(&ev->current);
+    return ev->instances[ins];
+}
--- a/src/server/daemon/event.h	Tue Jan 24 23:19:48 2017 +0100
+++ b/src/server/daemon/event.h	Wed Jan 25 19:19:47 2017 +0100
@@ -36,26 +36,11 @@
 extern "C" {
 #endif
 
-#define EVENT_POLLIN   0x1
-#define EVENT_POLLOUT  0x2
-    
-typedef struct event_handler event_handler_t;
-typedef struct event         event_t;
-
-typedef int(*event_func)(event_handler_t*, event_t*);
-
-struct event {
-    pblock       *pb;
-    Session      *sn;
-    Request      *rq;
-    event_func   fn;
-    event_func   finish;
-    intptr_t     object;
-    int          events;
-    int          poll;
-    void         *cookie;
-    int          error;
-};
+typedef struct EVHandler {
+    EventHandler **instances;
+    uint32_t     numins;
+    uint32_t     current;
+} EVHandler;
 
 typedef struct event_handler_conf {
     sstr_t   name;
@@ -63,27 +48,24 @@
     int      isdefault;
 } EventHandlerConfig;
 
-typedef struct event_handler_object {
-    event_handler_t   *handler;
-    int               nthreads;
-} EventHandlerObject;
-
 int create_event_handler(EventHandlerConfig *cfg);
 
 int check_event_handler_cfg();
 
-event_handler_t* get_default_event_handler();
+EVHandler* get_default_event_handler();
 
-event_handler_t* get_event_handler(char *name);
+EVHandler* get_event_handler(char *name);
+
+EventHandler* ev_instance(EVHandler *ev);
 
 /* implementation in event_$platform */
-event_handler_t* evhandler_create(int numthreads);
+EVHandler* evhandler_create(EventHandlerConfig *cfg);
 
-int ev_pollin(event_handler_t *h, int fd, event_t *event);
+int ev_pollin(EventHandler *h, int fd, Event *event);
 
-int ev_pollout(event_handler_t *h, int fd, event_t *event);
+int ev_pollout(EventHandler *h, int fd, Event *event);
 
-int evt_send(event_handler_t *h, event_t *event);
+int evt_send(EventHandler *h, Event *event);
 
 
 #ifdef	__cplusplus
--- a/src/server/daemon/event_solaris.c	Tue Jan 24 23:19:48 2017 +0100
+++ b/src/server/daemon/event_solaris.c	Wed Jan 25 19:19:47 2017 +0100
@@ -32,57 +32,35 @@
 
 #include "event_solaris.h"
 
-event_handler_t* evhandler_create(int numthreads) {
-    event_handler_t *ev = malloc(sizeof(event_handler_t));
-    if(ev == NULL) {
-        return NULL;
-    }
+EVHandler* evhandler_create(EventHandlerConfig *cfg) {
+    EVHandler *ev = malloc(sizeof(EVHandler));
+    ev->current = 0;
+    ev->instances = calloc(cfg->nthreads, sizeof(void*));
+    ev->numins = cfg->nthreads;
     
-    ev->ports = calloc(numthreads, sizeof(int));
-    if(ev->ports == NULL) {
-        free(ev);
-        return NULL;
-    }
-    ev->nports = numthreads;
-    ev->lp = 0;
-    
-    /* create ports event threads */
-    for(int i=0;i<numthreads;i++) {
-        /* create port */
-        ev->ports[i] = port_create();
-        if(ev->ports[i] == 0) {
-            free(ev->ports);
-            free(ev);
+    for(int i=0;i<cfg->nthreads;i++) {
+        EventHandler *handler = malloc(sizeof(EventHandler));
+        ev->instances[i] = handler;
+        
+        handler->port = port_create();
+        if(handler->port == 0) {
+            // TODO: error
             return NULL;
         }
         
-        /*
-         * start a new handler thread
-         * the thread needs the event port and a pointer to the event handler
-         */
-        ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t));
-        if(conf == NULL) {
-            free(ev->ports);
-            free(ev);
-            return NULL;
-        }
-        conf->handler = ev;
-        conf->port = ev->ports[i];
-        
-        systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf);
-        // TODO: error handling
+        SYS_THREAD t = systhread_start(
+                0,
+                0,
+                (thrstartfunc)ev_handle_events,
+                handler);
+        systhread_detach(t);
     }
     
     return ev;
 }
 
-void ev_handle_events(ev_thr_conf_t *conf) {
-    event_handler_t *ev = conf->handler;
-    int port = conf->port;
-    
-    free(conf);
-    
-    port_event_t events[16];
+void ev_handle_events(EventHandler *ev) {   
+    port_event_t events[64];
     struct timespec timeout;
     timeout.tv_nsec = 0;
     timeout.tv_sec = 600;
@@ -90,7 +68,7 @@
     for(;;) {
         // wait for events
         uint_t nev = 1;
-        int ret = port_getn(port, events, 16, &nev, &timeout);
+        int ret = port_getn(ev->port, events, 64, &nev, &timeout);
         if(ret == -1) {
             // TODO: check for error
             perror("port_getn");
@@ -98,28 +76,21 @@
         }
         
         for(int i=0;i<nev;i++) {
-            event_t *event = events[i].portev_user;
+            Event *event = events[i].portev_user;
             if(event->fn) {
-                int saved_ev = event->poll;
                 if(event->fn(ev, event)) {
                     /*
                      * on solaris we have to reassociate the fd after
                      * each event
                      * we do this if the event function returns 1
                      */
-                    
-                    if(event->poll != saved_ev) {
-                        // event type changed
-                        int ne = 0;
-                        if((event->poll & EVENT_POLLIN) == EVENT_POLLIN) {
-                            ne |= POLLIN;
-                        }
-                        if((event->poll & EVENT_POLLOUT) == EVENT_POLLOUT) {
-                            ne |= POLLOUT;
-                        }
-                    }
-                    
-                    if(ev_poll(ev, event)) {
+                    if(port_associate(
+                            ev->port,
+                            PORT_SOURCE_FD,
+                            (uintptr_t)event->object,
+                            ev_convert2sys_events(event->events),
+                            event))
+                    {
                         perror("port_associate");
                     }                 
                 } else if(event->finish) {
@@ -130,53 +101,42 @@
     }
 }
 
-// returns a event handler port
-int ev_get_port(event_handler_t *h) {
-    int nps = h->nports;
-    if(nps == 1) {
-        return h->ports[0];
+int ev_convert2sys_events(int events) {
+    int e = 0;
+    if((events & EVENT_POLLIN) == EVENT_POLLIN) {
+        e |= POLLIN;
     }
-    
-    int cp = h->lp % nps;
-    atomic_inc_32(&h->lp);
-    
-    return h->ports[cp];
+    if((events & EVENT_POLLOUT) == EVENT_POLLOUT) {
+        e |= POLLOUT;
+    }
+    return e;
 }
 
-int ev_pollin(event_handler_t *h, int fd, event_t *event) {
+
+int ev_pollin(EventHandler *h, int fd, Event *event) {
     event->object = (intptr_t)fd;
-    event->events = POLLIN;
-    event->poll = EVENT_POLLIN;
+    event->events = EVENT_POLLIN;
     return port_associate(
-            ev_get_port(h),
+            h->port,
             PORT_SOURCE_FD,
             (uintptr_t)fd,
             POLLIN,
             event);
 }
 
-int ev_pollout(event_handler_t *h, int fd, event_t *event) {
+int ev_pollout(EventHandler *h, int fd, Event *event) {
     event->object = (intptr_t)fd;
-    event->events = POLLOUT;
-    event->poll = EVENT_POLLOUT;
+    event->events = EVENT_POLLOUT;
     return port_associate(
-            ev_get_port(h),
+            h->port,
             PORT_SOURCE_FD,
             (uintptr_t)fd,
             POLLOUT,
             event);
 }
 
-int ev_poll(event_handler_t *h, event_t *event) {
-    return port_associate(
-            ev_get_port(h),
-            PORT_SOURCE_FD,
-            event->object,
-            event->events,
-            event);
+int evt_send(EventHandler *h, Event *event) {
+    event->object = 0;
+    event->events = 0;
+    return port_send(h->port, 0, event);
 }
-
-int evt_send(event_handler_t *h, event_t *event) {
-    event->object = 0;
-    return port_send(ev_get_port(h), 0, event);
-}
--- a/src/server/daemon/event_solaris.h	Tue Jan 24 23:19:48 2017 +0100
+++ b/src/server/daemon/event_solaris.h	Wed Jan 25 19:19:47 2017 +0100
@@ -39,22 +39,13 @@
 extern "C" {
 #endif
 
-struct event_handler {
-    int       *ports;
-    uint32_t  nports;
-    uint32_t  lp;
+struct EventHandler {
+    int port;
 };
 
-typedef struct ev_thr_conf {
-    event_handler_t *handler;
-    int             port;
-} ev_thr_conf_t;
+int ev_convert2sys_events(int events);
 
-void ev_handle_events(ev_thr_conf_t *conf);
-
-int ev_get_port(event_handler_t *h);
-
-int ev_poll(event_handler_t *h, event_t *event);
+void ev_handle_events(EventHandler *ev);
 
 #ifdef	__cplusplus
 }
--- a/src/server/daemon/httprequest.c	Tue Jan 24 23:19:48 2017 +0100
+++ b/src/server/daemon/httprequest.c	Wed Jan 25 19:19:47 2017 +0100
@@ -40,6 +40,7 @@
 #include "httprequest.h"
 #include "config.h"
 #include "vserver.h"
+#include "event.h"
 #include "httplistener.h"
 #include "func.h"
 #include "error.h"
@@ -87,7 +88,7 @@
     return S("/");
 }
 
-int handle_request(HTTPRequest *request, threadpool_t *thrpool) {
+int handle_request(HTTPRequest *request, threadpool_t *thrpool, EventHandler *ev) {
     // handle nsapi request
      
     // create pool
@@ -128,6 +129,11 @@
     sn->sn.fill = 1;
     sn->sn.subject = NULL;
     
+    if(!ev) {
+        ev = ev_instance(get_default_event_handler());
+    }
+    sn->sn.ev = ev;
+    
     // the session needs the current server configuration
     sn->config = request->connection->listener->cfg;
 
--- a/src/server/daemon/httprequest.h	Tue Jan 24 23:19:48 2017 +0100
+++ b/src/server/daemon/httprequest.h	Wed Jan 25 19:19:47 2017 +0100
@@ -79,7 +79,7 @@
  * request: request object
  * pool:    current thread pool or NULL
  */
-int handle_request(HTTPRequest *request, threadpool_t *pool);
+int handle_request(HTTPRequest *request, threadpool_t *pool, EventHandler *ev);
 
 
 
--- a/src/server/daemon/sessionhandler.c	Tue Jan 24 23:19:48 2017 +0100
+++ b/src/server/daemon/sessionhandler.c	Wed Jan 25 19:19:47 2017 +0100
@@ -40,9 +40,9 @@
 #include "httplistener.h"
 
 typedef struct _event_http_io {
-    HTTPRequest *request;
-    HttpParser  *parser;
-    int         error;
+    HTTPRequest  *request;
+    HttpParser   *parser;
+    int          error;
 } EventHttpIO;
 
 
@@ -153,7 +153,7 @@
     }
     
     // process request
-    r = handle_request(&request, NULL); // TODO: use correct thread pool
+    r = handle_request(&request, NULL, NULL); // TODO: use correct thread pool
     
     // TODO: free, see evt_request_finish
 
@@ -221,14 +221,14 @@
      * evt_enq_conn() --> event handler --> handle_request()
      */
     
-    event_handler_t *ev = ((EventSessionHandler*)handler)->eventhandler;
-    
-    event_t *event = malloc(sizeof(event_t));
-    ZERO(event, sizeof(event_t));
+    Event *event = malloc(sizeof(Event));
+    ZERO(event, sizeof(Event));
     event->fn = evt_request_input;
     event->finish = evt_request_finish;
     event->cookie = io;
     
+    EventHandler *ev = ev_instance(((EventSessionHandler*)handler)->eventhandler);
+    
     if(ev_pollin(ev, conn->fd, event) != 0) {
         // TODO: ev_pollin should log, intercept some errors here
         log_ereport(LOG_FAILURE, "Cannot enqueue connection");
@@ -237,7 +237,7 @@
     }
 }
 
-int evt_request_input(event_handler_t *handler, event_t *event) {    
+int evt_request_input(EventHandler *handler, Event *event) {    
     EventHttpIO *io = event->cookie;
     HttpParser  *parser  = io->parser;
     HTTPRequest *request = io->request;
@@ -255,11 +255,11 @@
             // SSL specific error handling
             switch(conn->ssl_error) {
                 case SSL_ERROR_WANT_READ: {
-                    event->poll = EVENT_POLLIN;
+                    event->events = EVENT_POLLIN;
                     return 1;
                 }
                 case SSL_ERROR_WANT_WRITE: {
-                    event->poll = EVENT_POLLOUT;
+                    event->events = EVENT_POLLOUT;
                     return 1;
                 }
             }
@@ -285,7 +285,7 @@
          * we need more data -> return 1 to tell the event handler to
          * continue polling
          */
-        event->poll = EVENT_POLLIN;
+        event->events = EVENT_POLLIN;
         return 1;
     }
     
@@ -312,12 +312,12 @@
     return 0;
 }
 
-int evt_request_finish(event_handler_t *h, event_t *event) { 
+int evt_request_finish(EventHandler *h, Event *event) { 
     EventHttpIO *io = event->cookie;
     HttpParser  *parser  = io->parser;
     HTTPRequest *request = io->request;
       
-    int r = handle_request(request, NULL);
+    int r = handle_request(request, NULL, h);
     if(r != 0) {
         // TODO: error message
         close(request->connection->fd);
@@ -338,7 +338,7 @@
     return 0;
 }
 
-int evt_request_error(event_handler_t *h, event_t *event) { 
+int evt_request_error(EventHandler *h, Event *event) { 
     EventHttpIO *io = event->cookie;
     HttpParser  *parser  = io->parser;
     HTTPRequest *request = io->request;
--- a/src/server/daemon/sessionhandler.h	Tue Jan 24 23:19:48 2017 +0100
+++ b/src/server/daemon/sessionhandler.h	Wed Jan 25 19:19:47 2017 +0100
@@ -93,7 +93,7 @@
  */
 typedef struct _event_session_handler {
     SessionHandler  sh;
-    event_handler_t *eventhandler;
+    EVHandler       *eventhandler;
 } EventSessionHandler;
 
 /*
@@ -125,9 +125,9 @@
 
 void evt_enq_conn(SessionHandler *handler, Connection *conn);
 
-int evt_request_input(event_handler_t *h, event_t *event);
-int evt_request_finish(event_handler_t *h, event_t *event);
-int evt_request_error(event_handler_t *h, event_t *event);
+int evt_request_input(EventHandler *h, Event *event);
+int evt_request_finish(EventHandler *h, Event *event);
+int evt_request_error(EventHandler *h, Event *event);
 
 void evt_keep_alive(SessionHandler *handler, Connection *conn);
 
--- a/src/server/public/nsapi.h	Tue Jan 24 23:19:48 2017 +0100
+++ b/src/server/public/nsapi.h	Wed Jan 25 19:19:47 2017 +0100
@@ -602,6 +602,28 @@
 };
 
 
+// nsapi ext begin
+
+#define EVENT_POLLIN   0x1
+#define EVENT_POLLOUT  0x2
+
+typedef struct EventHandler EventHandler;
+typedef struct Event Event;
+
+typedef int(*eventfunc)(EventHandler*, Event*);
+
+struct Event {
+    eventfunc   fn;
+    eventfunc   finish;
+    void        *cookie;
+    intptr_t    object;
+    int         events;
+    int         error;
+};
+
+// nsapi ext end
+
+
 typedef void* CONDVAR;
 typedef void *COUNTING_SEMAPHORE;
 typedef void* CRITICAL;
@@ -681,6 +703,8 @@
     struct in_addr iaddr;
 
     pool_handle_t *pool;
+    
+    EventHandler *ev; /* event handler instance (new) */
 
     void *clauth;       /* v2 ACL client authentication information */
     struct Session *next;
@@ -1525,6 +1549,12 @@
 threadpool_job* threadpool_get_job(threadpool_t *pool);
 void threadpool_run(threadpool_t *pool, job_callback_f func, void *data);
 
+int ev_pollin(EventHandler *h, int fd, Event *event);
+
+int ev_pollout(EventHandler *h, int fd, Event *event);
+
+int evt_send(EventHandler *h, Event *event);
+
 
 // assert
 void ws_log_assert(const char *file, const char *func, int line);
--- a/src/server/util/system.c	Tue Jan 24 23:19:48 2017 +0100
+++ b/src/server/util/system.c	Wed Jan 25 19:19:47 2017 +0100
@@ -390,4 +390,4 @@
 #endif
 }
 
-*/
\ No newline at end of file
+*/

mercurial