#include <stdio.h>
#include <stdlib.h>
#include "../util/atomic.h"
#include "../util/io.h"
#include "event_bsd.h"
#include "httprequest.h"
EVHandler* evhandler_create(EventHandlerConfig *cfg) {
EVHandler *ev = malloc(
sizeof(EVHandler));
ev->current =
0;
ev->instances = calloc(cfg->nthreads,
sizeof(
void*));
ev->numins = cfg->nthreads;
for(
int i=
0;i<cfg->nthreads;i++) {
EventHandlerKqueue *handler = malloc(
sizeof(EventHandlerKqueue));
memset(handler,
0,
sizeof(EventHandlerKqueue));
ev->instances[i] = handler;
handler->kqueue = kqueue();
if(handler->kqueue <
0) {
log_ereport(
LOG_FAILURE,
"evhandler_create: kqueue: %s", strerror(errno));
return NULL;
}
handler->thr = systhread_start(
0,
0,
(thrstartfunc)ev_handle_events,
handler);
}
return ev;
}
void ev_instance_wait(EventHandler *h) {
EventHandlerLinux *ev = (EventHandlerLinux*)h;
void *ret;
pthread_join(ev->thr, &ret);
}
static volatile int ev_close =
0;
void ev_instance_close(EventHandler *h) {
EventHandlerKqueue *ev = (EventHandlerKqueue*)h;
close(ev->kqueue);
ev_close =
1;
}
static Event shutdown_event;
void ev_instance_shutdown(EventHandler *h) {
event_send(h, &shutdown_event);
}
void ev_handle_events(EventHandlerKqueue *ev) {
EventHandler *h = (EventHandler*)ev;
struct timespec timeout;
timeout.tv_nsec =
0;
timeout.tv_sec =
600;
struct kevent events[
EV_MAX_EVENTS];
struct kevent changes[
EV_MAX_EVENTS*
2];
Event *finished[
EV_MAX_EVENTS];
int numchanges =
0;
int numfinished;
for(;;) {
int nev = kevent(ev->kqueue, changes, numchanges, events,
EV_MAX_EVENTS, &timeout);
if(nev == -
1) {
if(errno !=
EINTR) {
if(!ev_close) {
log_ereport(
LOG_CATASTROPHE,
"kevent failed: %s", strerror(errno));
}
break;
}
continue;
}
numchanges =
0;
int numfinished =
0;
ev->numret =
0;
for(
int i=
0;i<nev;i++) {
Event *event = (Event*)events[i].udata;
if(!event) {
if(events[i].flags ==
0) {
log_ereport(
LOG_WARN,
"Unknown kevent (ident=%d)", (
int)events[i].ident);
}
continue;
}
int event_events = event->events;
if(event->fn) {
int saved_ev = event->events;
if(!event->fn(h, event)) {
if(event->finish) {
finished[numfinished++] = event;
}
event_events =
0;
}
else {
event_events = event->events;
}
if(saved_ev != event_events) {
int e = event_events;
int e_fd = events[i].ident;
if((e &
EVENT_POLLIN) != (saved_ev &
EVENT_POLLIN)) {
if((e &
EVENT_POLLIN) ==
EVENT_POLLIN) {
EV_SET(&changes[numchanges++], e_fd,
EVFILT_READ,
EV_ADD,
0,
0, event);
}
else {
EV_SET(&changes[numchanges++], e_fd,
EVFILT_READ,
EV_DELETE,
0,
0,
NULL);
}
}
if((e &
EVENT_POLLOUT) != (saved_ev &
EVENT_POLLOUT)) {
if((e &
EVENT_POLLOUT) ==
EVENT_POLLOUT) {
EV_SET(&changes[numchanges++], e_fd,
EVFILT_WRITE,
EV_ADD,
0,
0, event);
}
else {
EV_SET(&changes[numchanges++], e_fd,
EVFILT_WRITE,
EV_DELETE,
0,
0,
NULL);
}
}
}
}
else if(event == &shutdown_event) {
ev_instance_close(h);
}
}
for(
int i=
0;i<numfinished;i++) {
Event *event = finished[i];
if(finished[i]->finish) {
finished[i]->finish(h, event);
}
}
for(
int i=
0;i<ev->base.numret;i++) {
EVReturn ret = ev->base.fnreturn[i];
nsapi_saf_return(ret.sn, ret.rq, ret.ret);
}
}
free(ev);
}
int ev_pollin(EventHandler *h,
int fd, Event *event) {
event->events =
EVENT_POLLIN;
struct kevent kev;
EV_SET(&kev, fd,
EVFILT_READ,
EV_ADD,
0,
0, event);
return kevent(h->kqueue, &kev,
1,
NULL,
0,
NULL);
}
int ev_pollout(EventHandler *h,
int fd, Event *event) {
event->events =
EVENT_POLLOUT;
struct kevent kev;
EV_SET(&kev, fd,
EVFILT_WRITE,
EV_ADD,
0,
0, event);
return kevent(h->kqueue, &kev,
1,
NULL,
0,
NULL);
}
int ev_remove_poll(EventHandler *h,
int fd) {
struct kevent kev;
EV_SET(&kev, fd,
EVFILT_READ,
EV_DELETE,
0,
0,
NULL);
int r1 = kevent(h->kqueue, &kev,
1,
NULL,
0,
NULL);
EV_SET(&kev, fd,
EVFILT_WRITE,
EV_DELETE,
0,
0,
NULL);
int r2 = kevent(h->kqueue, &kev,
1,
NULL,
0,
NULL);
return r1 != -
1 || r2 != -
1 ?
0 :
1;
}
int event_send(EventHandler *h, Event *event) {
return 0;
}
int ev_aioread(
int fd, aiocb_s *cb) {
ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset);
cb->result = result;
if(result <
0) {
cb->result_errno = errno;
}
return event_send(cb->evhandler, cb->event);
}
int ev_aiowrite(
int fd, aiocb_s *cb) {
ssize_t result = pwrite(fd, cb->buf, cb->nbytes, cb->offset);
cb->result = result;
if(result <
0) {
cb->result_errno = errno;
}
return event_send(cb->evhandler, cb->event);
}
int event_pollin(EventHandler *ev,
SYS_NETFD fd, Event *event) {
return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_IN, event);
}
int event_pollout(EventHandler *ev,
SYS_NETFD fd, Event *event) {
return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_OUT, event);
}
int event_removepoll(EventHandler *ev,
SYS_NETFD fd) {
return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_NONE,
NULL);
}