#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <signal.h>
#include <cx/array_list.h>
#include <cx/linked_list.h>
#include "../util/systhr.h"
#include "../util/atomic.h"
#include "../util/io.h"
#include "event.h"
#include "event_linux.h"
#include "httprequest.h"
EVHandler* evhandler_create(EventHandlerConfig *cfg) {
EVHandler *ev = malloc(
sizeof(EVHandler));
ev->current =
0;
ev->instances = calloc(cfg->nthreads,
sizeof(
void*));
ev->numins = cfg->nthreads;
for(
int i=
0;i<cfg->nthreads;i++) {
EventHandlerLinux *handler = malloc(
sizeof(EventHandlerLinux));
memset(handler,
0,
sizeof(EventHandlerLinux));
ev->instances[i] = (EventHandler*)handler;
handler->ep = epoll_create(
64);
if(handler->ep <
0) {
log_ereport(
LOG_FAILURE,
"evhandler_create: epoll_create: %s", strerror(errno));
return NULL;
}
handler->event_fd = eventfd(
0,
EFD_NONBLOCK);
if(handler->event_fd <
0) {
log_ereport(
LOG_FAILURE,
"evhandler_create: eventfd: %s", strerror(errno));
return NULL;
}
EventQueue *queue = malloc(
sizeof(EventQueue));
if(!queue) {
return NULL;
}
queue->numevents =
0;
queue->next =
NULL;
handler->queue_begin = queue;
handler->queue_end = queue;
handler->num_reserve =
0;
if(pthread_mutex_init(&handler->queue_lock,
NULL)) {
log_ereport(
LOG_FAILURE,
"evhandler_create: cannot initialize mutex");
return NULL;
}
struct epoll_event epev;
epev.events =
EPOLLIN |
EPOLLET;
epev.data.ptr =
NULL;
if(epoll_ctl(handler->ep,
EPOLL_CTL_ADD, handler->event_fd, &epev)) {
log_ereport(
LOG_FAILURE,
"evhandler_create: epoll_ctl: %s", strerror(errno));
return NULL;
}
handler->thr = systhread_start(
0,
0,
(thrstartfunc)ev_handle_events,
handler);
}
return ev;
}
void ev_instance_wait(EventHandler *h) {
EventHandlerLinux *ev = (EventHandlerLinux*)h;
void *ret;
pthread_join(ev->thr, &ret);
}
static volatile int ev_close =
0;
void ev_instance_close(EventHandler *h) {
EventHandlerLinux *ev = (EventHandlerLinux*)h;
ev_close =
1;
close(ev->ep);
}
static Event shutdown_event;
void ev_instance_shutdown(EventHandler *h) {
event_send(h, &shutdown_event);
}
void ev_handle_events(EventHandlerLinux *ev) {
EventHandler *h = (EventHandler*)ev;
int ep = ev->ep;
struct epoll_event events[
EV_MAX_EVENTS];
Event* finished[
EV_MAX_EVENTS];
size_t queue_len =
0;
int loop_ctn =
0;
int ev_shutdown =
0;
while(!ev_shutdown) {
int ret =
0;
if(queue_len >
0) {
pthread_mutex_lock(&ev->queue_lock);
EventQueue *queue = ev->queue_begin;
queue_len = queue->numevents;
for(
int i=
0;i<queue_len;i++) {
events[i].events =
0;
events[i].data.ptr = queue->events[i];
}
queue->numevents =
0;
if(queue->next) {
ev->queue_begin = ev->queue_begin->next;
if(ev->num_reserve <
EV_QUEUE_RESERVE) {
ev->reserve_block[ev->num_reserve++] = queue;
queue->next =
NULL;
}
else {
free(queue);
}
}
ret = queue_len;
queue_len = ev->queue_begin->numevents;
pthread_mutex_unlock(&ev->queue_lock);
}
else {
ret = epoll_wait(ep, events,
EV_MAX_EVENTS,
EV_IDLE_TIMEOUT *
1000);
if(ret == -
1) {
if(errno !=
EINTR) {
if(!ev_close) {
log_ereport(
LOG_CATASTROPHE,
"epoll_wait failed: %s", strerror(errno));
}
break;
}
continue;
}
}
int numfinished =
0;
ev->base.numret =
0;
for(
int i=
0;i<ret;i++) {
Event *event = events[i].data.ptr;
if(!event) {
uint64_t eventfd_r =
0;
ssize_t r = read(ev->event_fd, &eventfd_r,
sizeof(
uint64_t));
if(r >
0) {
queue_len = eventfd_r;
}
else {
log_ereport(
LOG_FAILURE,
"eventhandler: eventfd read failed: %s", strerror(errno));
}
continue;
}
if(event->fn) {
int saved_ev = event->events;
if(!event->fn(h, event)) {
if(saved_ev && epoll_ctl(ep,
EPOLL_CTL_DEL, event->object,
NULL)) {
event->error =
1;
log_ereport(
LOG_FAILURE,
"epoll_ctl failed: fd: %d error: %s",
event->object,
strerror(errno));
}
if(event->finish) {
finished[numfinished++] = event;
}
}
else {
if(saved_ev != event->events) {
struct epoll_event epev;
epev.events =
EPOLLET;
epev.data.ptr = event;
epev.events = ev_convert2sys_events(event->events);
if(epoll_ctl(ep,
EPOLL_CTL_MOD, event->object, &epev)) {
log_ereport(
LOG_FAILURE,
"epoll_wait failed: %s",
strerror(errno));
}
}
}
}
else if(event == &shutdown_event) {
ev_instance_close(h);
}
}
for(
int i=
0;i<numfinished;i++) {
Event *event = finished[i];
if(finished[i]->finish) {
finished[i]->finish(h, event);
}
}
for(
int i=
0;i<ev->base.numret;i++) {
EVReturn ret = ev->base.fnreturn[i];
nsapi_saf_return(ret.sn, ret.rq, ret.ret);
}
if(ret ==
0 || ++loop_ctn >=
EV_IDLE_LOOP_CTN) {
watchlist_check(&ev->base,
0);
loop_ctn =
0;
}
}
ev_queue_free(ev->queue_begin);
pthread_mutex_destroy(&ev->queue_lock);
close(ev->event_fd);
free(ev);
}
void ev_queue_free(EventQueue *queue) {
while(queue) {
EventQueue *next = queue->next;
free(queue);
queue = next;
}
}
int ev_convert2sys_events(
int events) {
int e =
EPOLLET;
if((events &
EVENT_POLLIN) ==
EVENT_POLLIN) {
e |=
EPOLLIN;
}
if((events &
EVENT_POLLOUT) ==
EVENT_POLLOUT) {
e |=
EPOLLOUT;
}
return e;
}
int ev_pollin(EventHandler *h,
int fd, Event *event) {
EventHandlerLinux *ev = (EventHandlerLinux*)h;
event->object = (
intptr_t)fd;
event->events =
EVENT_POLLIN;
struct epoll_event epev;
epev.events =
EPOLLIN |
EPOLLRDHUP |
EPOLLET;
epev.data.ptr = event;
return epoll_ctl(ev->ep,
EPOLL_CTL_ADD, fd, &epev);
}
int ev_pollout(EventHandler *h,
int fd, Event *event) {
EventHandlerLinux *ev = (EventHandlerLinux*)h;
event->object = (
intptr_t)fd;
event->events =
EVENT_POLLOUT;
struct epoll_event epev;
epev.events =
EPOLLOUT |
EPOLLRDHUP |
EPOLLET;
epev.data.ptr = event;
return epoll_ctl(ev->ep,
EPOLL_CTL_ADD, fd, &epev);
}
int ev_remove_poll(EventHandler *h,
int fd) {
EventHandlerLinux *ev = (EventHandlerLinux*)h;
return epoll_ctl(ev->ep,
EPOLL_CTL_DEL, fd,
NULL);
}
int event_send(EventHandler *h, Event *event) {
EventHandlerLinux *ev = (EventHandlerLinux*)h;
event->object =
0;
event->events =
0;
int err =
0;
pthread_mutex_lock(&ev->queue_lock);
EventQueue *block = ev->queue_end;
if(block->numevents >=
EV_MAX_EVENTS) {
if(ev->num_reserve >
0) {
block = ev->reserve_block[ev->num_reserve-
1];
ev->num_reserve--;
}
else {
block = malloc(
sizeof(EventQueue));
if(!block) {
block =
NULL;
err =
1;
}
}
if(block) {
ev->queue_end->next = block;
ev->queue_end = block;
block->numevents =
0;
block->next =
NULL;
}
}
if(block) {
block->events[block->numevents++] = event;
}
pthread_mutex_unlock(&ev->queue_lock);
if(!err) {
uint64_t data =
1;
ssize_t r = write(ev->event_fd, &data,
sizeof(
uint64_t));
if(r ==
0) {
log_ereport(
LOG_FAILURE,
"eventhandler: failed to send event: %s", strerror(errno));
err =
1;
}
}
return err;
}
int ev_aioread(
int fd, aiocb_s *cb) {
ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset);
cb->result = result;
if(result <
0) {
cb->result_errno = errno;
}
return event_send(cb->evhandler, cb->event);
}
int ev_aiowrite(
int fd, aiocb_s *cb) {
ssize_t result = pwrite(fd, cb->buf, cb->nbytes, cb->offset);
cb->result = result;
if(result <
0) {
cb->result_errno = errno;
}
return event_send(cb->evhandler, cb->event);
}
int event_pollin(EventHandler *ev,
SYS_NETFD fd, Event *event) {
return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_IN, event);
}
int event_pollout(EventHandler *ev,
SYS_NETFD fd, Event *event) {
return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_OUT, event);
}
int event_removepoll(EventHandler *ev,
SYS_NETFD fd) {
return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_NONE,
NULL);
}