Wed, 25 Jan 2017 19:19:47 +0100
makes EventHandler public
--- a/src/server/daemon/event.c Tue Jan 24 23:19:48 2017 +0100 +++ b/src/server/daemon/event.c Wed Jan 25 19:19:47 2017 +0100 @@ -26,16 +26,17 @@ * POSSIBILITY OF SUCH DAMAGE. */ -#include <ucx/map.h> +#include "../../ucx/map.h" +#include "../util/atomic.h" #include "event.h" UcxMap *event_handler_map = NULL; int numevhandlers = 0; -event_handler_t *default_event_handler = NULL; +EVHandler *default_event_handler = NULL; -event_handler_t *last_handler_c = NULL; +EVHandler *last_handler_c = NULL; int create_event_handler(EventHandlerConfig *cfg) { if(event_handler_map == NULL) { @@ -50,7 +51,7 @@ } /* create new handler */ - event_handler_t *e = evhandler_create(cfg->nthreads); + EVHandler *e = evhandler_create(cfg); if(e == NULL) { return 1; } @@ -95,10 +96,21 @@ } -event_handler_t* get_default_event_handler() { +EVHandler* get_default_event_handler() { return default_event_handler; } -event_handler_t* get_event_handler(char *name) { +EVHandler* get_event_handler(char *name) { return ucx_map_cstr_get(event_handler_map, name); } + +EventHandler* ev_instance(EVHandler *ev) { + int nev = ev->numins; + if(nev == 1) { + return ev->instances[0]; + } + + int ins = ev->current & nev; + ws_atomic_inc32(&ev->current); + return ev->instances[ins]; +}
--- a/src/server/daemon/event.h Tue Jan 24 23:19:48 2017 +0100 +++ b/src/server/daemon/event.h Wed Jan 25 19:19:47 2017 +0100 @@ -36,26 +36,11 @@ extern "C" { #endif -#define EVENT_POLLIN 0x1 -#define EVENT_POLLOUT 0x2 - -typedef struct event_handler event_handler_t; -typedef struct event event_t; - -typedef int(*event_func)(event_handler_t*, event_t*); - -struct event { - pblock *pb; - Session *sn; - Request *rq; - event_func fn; - event_func finish; - intptr_t object; - int events; - int poll; - void *cookie; - int error; -}; +typedef struct EVHandler { + EventHandler **instances; + uint32_t numins; + uint32_t current; +} EVHandler; typedef struct event_handler_conf { sstr_t name; @@ -63,27 +48,24 @@ int isdefault; } EventHandlerConfig; -typedef struct event_handler_object { - event_handler_t *handler; - int nthreads; -} EventHandlerObject; - int create_event_handler(EventHandlerConfig *cfg); int check_event_handler_cfg(); -event_handler_t* get_default_event_handler(); +EVHandler* get_default_event_handler(); -event_handler_t* get_event_handler(char *name); +EVHandler* get_event_handler(char *name); + +EventHandler* ev_instance(EVHandler *ev); /* implementation in event_$platform */ -event_handler_t* evhandler_create(int numthreads); +EVHandler* evhandler_create(EventHandlerConfig *cfg); -int ev_pollin(event_handler_t *h, int fd, event_t *event); +int ev_pollin(EventHandler *h, int fd, Event *event); -int ev_pollout(event_handler_t *h, int fd, event_t *event); +int ev_pollout(EventHandler *h, int fd, Event *event); -int evt_send(event_handler_t *h, event_t *event); +int evt_send(EventHandler *h, Event *event); #ifdef __cplusplus
--- a/src/server/daemon/event_solaris.c Tue Jan 24 23:19:48 2017 +0100 +++ b/src/server/daemon/event_solaris.c Wed Jan 25 19:19:47 2017 +0100 @@ -32,57 +32,35 @@ #include "event_solaris.h" -event_handler_t* evhandler_create(int numthreads) { - event_handler_t *ev = malloc(sizeof(event_handler_t)); - if(ev == NULL) { - return NULL; - } +EVHandler* evhandler_create(EventHandlerConfig *cfg) { + EVHandler *ev = malloc(sizeof(EVHandler)); + ev->current = 0; + ev->instances = calloc(cfg->nthreads, sizeof(void*)); + ev->numins = cfg->nthreads; - ev->ports = calloc(numthreads, sizeof(int)); - if(ev->ports == NULL) { - free(ev); - return NULL; - } - ev->nports = numthreads; - ev->lp = 0; - - /* create ports event threads */ - for(int i=0;i<numthreads;i++) { - /* create port */ - ev->ports[i] = port_create(); - if(ev->ports[i] == 0) { - free(ev->ports); - free(ev); + for(int i=0;i<cfg->nthreads;i++) { + EventHandler *handler = malloc(sizeof(EventHandler)); + ev->instances[i] = handler; + + handler->port = port_create(); + if(handler->port == 0) { + // TODO: error return NULL; } - /* - * start a new handler thread - * the thread needs the event port and a pointer to the event handler - */ - ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t)); - if(conf == NULL) { - free(ev->ports); - free(ev); - return NULL; - } - conf->handler = ev; - conf->port = ev->ports[i]; - - systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf); - // TODO: error handling + SYS_THREAD t = systhread_start( + 0, + 0, + (thrstartfunc)ev_handle_events, + handler); + systhread_detach(t); } return ev; } -void ev_handle_events(ev_thr_conf_t *conf) { - event_handler_t *ev = conf->handler; - int port = conf->port; - - free(conf); - - port_event_t events[16]; +void ev_handle_events(EventHandler *ev) { + port_event_t events[64]; struct timespec timeout; timeout.tv_nsec = 0; timeout.tv_sec = 600; @@ -90,7 +68,7 @@ for(;;) { // wait for events uint_t nev = 1; - int ret = port_getn(port, events, 16, &nev, &timeout); + int ret = port_getn(ev->port, events, 64, &nev, &timeout); if(ret == -1) { // TODO: check for error perror("port_getn"); @@ -98,28 +76,21 @@ } for(int i=0;i<nev;i++) { - event_t *event = events[i].portev_user; + Event *event = events[i].portev_user; if(event->fn) { - int saved_ev = event->poll; if(event->fn(ev, event)) { /* * on solaris we have to reassociate the fd after * each event * we do this if the event function returns 1 */ - - if(event->poll != saved_ev) { - // event type changed - int ne = 0; - if((event->poll & EVENT_POLLIN) == EVENT_POLLIN) { - ne |= POLLIN; - } - if((event->poll & EVENT_POLLOUT) == EVENT_POLLOUT) { - ne |= POLLOUT; - } - } - - if(ev_poll(ev, event)) { + if(port_associate( + ev->port, + PORT_SOURCE_FD, + (uintptr_t)event->object, + ev_convert2sys_events(event->events), + event)) + { perror("port_associate"); } } else if(event->finish) { @@ -130,53 +101,42 @@ } } -// returns a event handler port -int ev_get_port(event_handler_t *h) { - int nps = h->nports; - if(nps == 1) { - return h->ports[0]; +int ev_convert2sys_events(int events) { + int e = 0; + if((events & EVENT_POLLIN) == EVENT_POLLIN) { + e |= POLLIN; } - - int cp = h->lp % nps; - atomic_inc_32(&h->lp); - - return h->ports[cp]; + if((events & EVENT_POLLOUT) == EVENT_POLLOUT) { + e |= POLLOUT; + } + return e; } -int ev_pollin(event_handler_t *h, int fd, event_t *event) { + +int ev_pollin(EventHandler *h, int fd, Event *event) { event->object = (intptr_t)fd; - event->events = POLLIN; - event->poll = EVENT_POLLIN; + event->events = EVENT_POLLIN; return port_associate( - ev_get_port(h), + h->port, PORT_SOURCE_FD, (uintptr_t)fd, POLLIN, event); } -int ev_pollout(event_handler_t *h, int fd, event_t *event) { +int ev_pollout(EventHandler *h, int fd, Event *event) { event->object = (intptr_t)fd; - event->events = POLLOUT; - event->poll = EVENT_POLLOUT; + event->events = EVENT_POLLOUT; return port_associate( - ev_get_port(h), + h->port, PORT_SOURCE_FD, (uintptr_t)fd, POLLOUT, event); } -int ev_poll(event_handler_t *h, event_t *event) { - return port_associate( - ev_get_port(h), - PORT_SOURCE_FD, - event->object, - event->events, - event); +int evt_send(EventHandler *h, Event *event) { + event->object = 0; + event->events = 0; + return port_send(h->port, 0, event); } - -int evt_send(event_handler_t *h, event_t *event) { - event->object = 0; - return port_send(ev_get_port(h), 0, event); -}
--- a/src/server/daemon/event_solaris.h Tue Jan 24 23:19:48 2017 +0100 +++ b/src/server/daemon/event_solaris.h Wed Jan 25 19:19:47 2017 +0100 @@ -39,22 +39,13 @@ extern "C" { #endif -struct event_handler { - int *ports; - uint32_t nports; - uint32_t lp; +struct EventHandler { + int port; }; -typedef struct ev_thr_conf { - event_handler_t *handler; - int port; -} ev_thr_conf_t; +int ev_convert2sys_events(int events); -void ev_handle_events(ev_thr_conf_t *conf); - -int ev_get_port(event_handler_t *h); - -int ev_poll(event_handler_t *h, event_t *event); +void ev_handle_events(EventHandler *ev); #ifdef __cplusplus }
--- a/src/server/daemon/httprequest.c Tue Jan 24 23:19:48 2017 +0100 +++ b/src/server/daemon/httprequest.c Wed Jan 25 19:19:47 2017 +0100 @@ -40,6 +40,7 @@ #include "httprequest.h" #include "config.h" #include "vserver.h" +#include "event.h" #include "httplistener.h" #include "func.h" #include "error.h" @@ -87,7 +88,7 @@ return S("/"); } -int handle_request(HTTPRequest *request, threadpool_t *thrpool) { +int handle_request(HTTPRequest *request, threadpool_t *thrpool, EventHandler *ev) { // handle nsapi request // create pool @@ -128,6 +129,11 @@ sn->sn.fill = 1; sn->sn.subject = NULL; + if(!ev) { + ev = ev_instance(get_default_event_handler()); + } + sn->sn.ev = ev; + // the session needs the current server configuration sn->config = request->connection->listener->cfg;
--- a/src/server/daemon/httprequest.h Tue Jan 24 23:19:48 2017 +0100 +++ b/src/server/daemon/httprequest.h Wed Jan 25 19:19:47 2017 +0100 @@ -79,7 +79,7 @@ * request: request object * pool: current thread pool or NULL */ -int handle_request(HTTPRequest *request, threadpool_t *pool); +int handle_request(HTTPRequest *request, threadpool_t *pool, EventHandler *ev);
--- a/src/server/daemon/sessionhandler.c Tue Jan 24 23:19:48 2017 +0100 +++ b/src/server/daemon/sessionhandler.c Wed Jan 25 19:19:47 2017 +0100 @@ -40,9 +40,9 @@ #include "httplistener.h" typedef struct _event_http_io { - HTTPRequest *request; - HttpParser *parser; - int error; + HTTPRequest *request; + HttpParser *parser; + int error; } EventHttpIO; @@ -153,7 +153,7 @@ } // process request - r = handle_request(&request, NULL); // TODO: use correct thread pool + r = handle_request(&request, NULL, NULL); // TODO: use correct thread pool // TODO: free, see evt_request_finish @@ -221,14 +221,14 @@ * evt_enq_conn() --> event handler --> handle_request() */ - event_handler_t *ev = ((EventSessionHandler*)handler)->eventhandler; - - event_t *event = malloc(sizeof(event_t)); - ZERO(event, sizeof(event_t)); + Event *event = malloc(sizeof(Event)); + ZERO(event, sizeof(Event)); event->fn = evt_request_input; event->finish = evt_request_finish; event->cookie = io; + EventHandler *ev = ev_instance(((EventSessionHandler*)handler)->eventhandler); + if(ev_pollin(ev, conn->fd, event) != 0) { // TODO: ev_pollin should log, intercept some errors here log_ereport(LOG_FAILURE, "Cannot enqueue connection"); @@ -237,7 +237,7 @@ } } -int evt_request_input(event_handler_t *handler, event_t *event) { +int evt_request_input(EventHandler *handler, Event *event) { EventHttpIO *io = event->cookie; HttpParser *parser = io->parser; HTTPRequest *request = io->request; @@ -255,11 +255,11 @@ // SSL specific error handling switch(conn->ssl_error) { case SSL_ERROR_WANT_READ: { - event->poll = EVENT_POLLIN; + event->events = EVENT_POLLIN; return 1; } case SSL_ERROR_WANT_WRITE: { - event->poll = EVENT_POLLOUT; + event->events = EVENT_POLLOUT; return 1; } } @@ -285,7 +285,7 @@ * we need more data -> return 1 to tell the event handler to * continue polling */ - event->poll = EVENT_POLLIN; + event->events = EVENT_POLLIN; return 1; } @@ -312,12 +312,12 @@ return 0; } -int evt_request_finish(event_handler_t *h, event_t *event) { +int evt_request_finish(EventHandler *h, Event *event) { EventHttpIO *io = event->cookie; HttpParser *parser = io->parser; HTTPRequest *request = io->request; - int r = handle_request(request, NULL); + int r = handle_request(request, NULL, h); if(r != 0) { // TODO: error message close(request->connection->fd); @@ -338,7 +338,7 @@ return 0; } -int evt_request_error(event_handler_t *h, event_t *event) { +int evt_request_error(EventHandler *h, Event *event) { EventHttpIO *io = event->cookie; HttpParser *parser = io->parser; HTTPRequest *request = io->request;
--- a/src/server/daemon/sessionhandler.h Tue Jan 24 23:19:48 2017 +0100 +++ b/src/server/daemon/sessionhandler.h Wed Jan 25 19:19:47 2017 +0100 @@ -93,7 +93,7 @@ */ typedef struct _event_session_handler { SessionHandler sh; - event_handler_t *eventhandler; + EVHandler *eventhandler; } EventSessionHandler; /* @@ -125,9 +125,9 @@ void evt_enq_conn(SessionHandler *handler, Connection *conn); -int evt_request_input(event_handler_t *h, event_t *event); -int evt_request_finish(event_handler_t *h, event_t *event); -int evt_request_error(event_handler_t *h, event_t *event); +int evt_request_input(EventHandler *h, Event *event); +int evt_request_finish(EventHandler *h, Event *event); +int evt_request_error(EventHandler *h, Event *event); void evt_keep_alive(SessionHandler *handler, Connection *conn);
--- a/src/server/public/nsapi.h Tue Jan 24 23:19:48 2017 +0100 +++ b/src/server/public/nsapi.h Wed Jan 25 19:19:47 2017 +0100 @@ -602,6 +602,28 @@ }; +// nsapi ext begin + +#define EVENT_POLLIN 0x1 +#define EVENT_POLLOUT 0x2 + +typedef struct EventHandler EventHandler; +typedef struct Event Event; + +typedef int(*eventfunc)(EventHandler*, Event*); + +struct Event { + eventfunc fn; + eventfunc finish; + void *cookie; + intptr_t object; + int events; + int error; +}; + +// nsapi ext end + + typedef void* CONDVAR; typedef void *COUNTING_SEMAPHORE; typedef void* CRITICAL; @@ -681,6 +703,8 @@ struct in_addr iaddr; pool_handle_t *pool; + + EventHandler *ev; /* event handler instance (new) */ void *clauth; /* v2 ACL client authentication information */ struct Session *next; @@ -1525,6 +1549,12 @@ threadpool_job* threadpool_get_job(threadpool_t *pool); void threadpool_run(threadpool_t *pool, job_callback_f func, void *data); +int ev_pollin(EventHandler *h, int fd, Event *event); + +int ev_pollout(EventHandler *h, int fd, Event *event); + +int evt_send(EventHandler *h, Event *event); + // assert void ws_log_assert(const char *file, const char *func, int line);