#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include "../util/systhr.h"
#include "../util/atomic.h"
#include "../util/io.h"
#include "event.h"
#include "event_linux.h"
EVHandler* evhandler_create(EventHandlerConfig *cfg) {
EVHandler *ev = malloc(
sizeof(EVHandler));
ev->current =
0;
ev->instances = calloc(cfg->nthreads,
sizeof(
void*));
ev->numins = cfg->nthreads;
for(
int i=
0;i<cfg->nthreads;i++) {
EventHandler *handler = malloc(
sizeof(EventHandler));
ev->instances[i] = handler;
handler->ep = epoll_create(
64);
if(handler->ep ==
0) {
return NULL;
}
int eventpipe[
2];
if(pipe(eventpipe)) {
return NULL;
}
handler->eventin = eventpipe[
0];
handler->eventout = eventpipe[
1];
struct epoll_event epev;
epev.events =
EPOLLIN |
EPOLLET;
epev.data.ptr =
NULL;
if(epoll_ctl(handler->ep,
EPOLL_CTL_ADD, handler->eventin, &epev)) {
return NULL;
}
SYS_THREAD t = systhread_start(
0,
0,
(thrstartfunc)ev_handle_events,
handler);
systhread_detach(t);
}
return ev;
}
void ev_handle_events(EventHandler *ev) {
int ep = ev->ep;
struct epoll_event events[
16];
for(;;) {
int ret = epoll_wait(ep, events,
16,
100000);
if(ret == -
1 && errno !=
EINTR) {
log_ereport(
LOG_FAILURE,
"epoll_wait failed: %s", strerror(errno));
continue;
}
for(
int i=
0;i<ret;i++) {
Event *event = events[i].data.ptr;
if(!event) {
char ebuf[
sizeof(Event*)];
int ebufpos =
0;
char *b = ebuf;
while(ebufpos <
sizeof(Event*)) {
ssize_t r = read(ev->eventin, b + ebufpos,
sizeof(Event*)-ebufpos);
if(r <
0) {
break;
}
ebufpos += r;
}
if(ebufpos ==
sizeof(Event*)) {
intptr_t *p = (
intptr_t*)b;
*(&event) = (Event*)*p;
if(event->fn) {
if(!event->fn(ev, event) && event->finish) {
event->finish(ev, event);
}
}
}
}
else if(event->fn) {
int saved_ev = event->events;
if(!event->fn(ev, event)) {
if(epoll_ctl(ep,
EPOLL_CTL_DEL, event->object,
NULL)) {
event->error =
1;
log_ereport(
LOG_FAILURE,
"epoll_ctl failed: fd: %d error: %s",
event->object,
strerror(errno));
}
if(event->finish) {
event->finish(ev, event);
}
}
else {
if(saved_ev != event->events) {
struct epoll_event epev;
epev.events =
EPOLLET;
epev.data.ptr = event;
epev.events = ev_convert2sys_events(event->events);
if(epoll_ctl(ep,
EPOLL_CTL_MOD, event->object, &epev)) {
log_ereport(
LOG_FAILURE,
"epoll_wait failed: %s",
strerror(errno));
}
}
}
}
}
}
}
int ev_convert2sys_events(
int events) {
int e =
EPOLLET;
if((events &
EVENT_POLLIN) ==
EVENT_POLLIN) {
e |=
EPOLLIN;
}
if((events &
EVENT_POLLOUT) ==
EVENT_POLLOUT) {
e |=
EPOLLOUT;
}
return e;
}
int ev_pollin(EventHandler *h,
int fd, Event *event) {
event->object = (
intptr_t)fd;
event->events =
EVENT_POLLIN;
struct epoll_event epev;
epev.events =
EPOLLIN |
EPOLLET;
epev.data.ptr = event;
return epoll_ctl(h->ep,
EPOLL_CTL_ADD, fd, &epev);
}
int ev_pollout(EventHandler *h,
int fd, Event *event) {
event->object = (
intptr_t)fd;
event->events =
EVENT_POLLOUT;
struct epoll_event epev;
epev.events =
EPOLLOUT |
EPOLLET;
epev.data.ptr = event;
return epoll_ctl(h->ep,
EPOLL_CTL_ADD, fd, &epev);
}
int ev_remove_poll(EventHandler *h,
int fd) {
return epoll_ctl(h->ep,
EPOLL_CTL_DEL, fd,
NULL);
}
int event_send(EventHandler *h, Event *event) {
event->object =
0;
event->events =
0;
ssize_t r = write(h->eventout, &event,
sizeof(Event*));
if(r <
sizeof(Event*)) {
log_ereport(
LOG_FAILURE,
"failed to send event: %s", strerror(errno));
}
return r >
0 ?
0 :
1;
}
int ev_aioread(
int fd, aiocb_s *cb) {
ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset);
cb->result = result;
if(result <
0) {
cb->result_errno = errno;
}
return event_send(cb->evhandler, cb->event);
}
int ev_aiowrite(
int fd, aiocb_s *cb) {
ssize_t result = pwrite(fd, cb->buf, cb->nbytes, cb->offset);
cb->result = result;
if(result <
0) {
cb->result_errno = errno;
}
return event_send(cb->evhandler, cb->event);
}
int event_pollin(EventHandler *ev,
SYS_NETFD fd, Event *event) {
return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_IN, event);
}
int event_pollout(EventHandler *ev,
SYS_NETFD fd, Event *event) {
return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_OUT, event);
}
int event_removepoll(EventHandler *ev,
SYS_NETFD fd) {
return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_NONE,
NULL);
}