#include <stdio.h>
#include <stdlib.h>
#include <atomic.h>
#include "../util/io.h"
#include "event_solaris.h"
#include "httprequest.h"
EVHandler* evhandler_create(EventHandlerConfig *cfg) {
EVHandler *ev = malloc(
sizeof(EVHandler));
ev->current =
0;
ev->instances = calloc(cfg->nthreads,
sizeof(
void*));
ev->numins = cfg->nthreads;
for(
int i=
0;i<cfg->nthreads;i++) {
EventHandlerSolaris *handler = malloc(
sizeof(EventHandlerSolaris));
memset(handler,
0,
sizeof(EventHandlerSolaris));
ev->instances[i] = (EventHandler*)handler;
handler->port = port_create();
if(handler->port <
0) {
log_ereport(
LOG_FAILURE,
"evhandler_create: port_create: %s", strerror(errno));
return NULL;
}
handler->thr = systhread_start(
0,
0,
(thrstartfunc)ev_handle_events,
handler);
}
return ev;
}
void ev_instance_wait(EventHandler *h) {
EventHandlerLinux *ev = (EventHandlerLinux*)h;
void *ret;
pthread_join(ev->thr, &ret);
}
static volatile int ev_close =
0;
void ev_instance_close(EventHandler *h) {
EventHandlerSolaris *ev = (EventHandlerSolaris*)h;
close(ev->port);
ev_close =
1;
}
static Event shutdown_event;
void ev_instance_shutdown(EventHandler *h) {
event_send(h, &shutdown_event);
}
void ev_handle_events(EventHandlerSolaris *ev) {
EventHandler *h = (EventHandler*)ev;
port_event_t events[
EV_MAX_EVENTS];
Event *finished[
EV_MAX_EVENTS];
struct timespec timeout;
timeout.tv_nsec =
0;
timeout.tv_sec =
600;
for(;;) {
uint_t nev =
1;
int ret = port_getn(ev->port, events,
EV_MAX_EVENTS, &nev, &timeout);
if(ret == -
1) {
if(errno !=
EINTR && errno !=
ETIME) {
if(!ev_close) {
log_ereport(
LOG_CATASTROPHE,
"port_getn failed: %s", strerror(errno));
}
break;
}
continue;
}
int numfinished =
0;
ev->numret =
0;
for(
int i=
0;i<nev;i++) {
Event *event = events[i].portev_user;
if(events[i].portev_source ==
PORT_SOURCE_AIO) {
aiocb_t *aiocb = (
aiocb_t*)events[i].portev_object;
if(event) {
aiocb_s *aio = (aiocb_s*)event->object;
aio->result = aiocb->aio_resultp.aio_return;
aio->result_errno = aiocb->aio_resultp.aio_errno;
if(event->fn) {
if(!event->fn(h, event) && event->finish) {
finished[numfinished++] = event;
}
}
}
free(aiocb);
}
else {
if(event->fn) {
if(event->fn(ev, event)) {
if(port_associate(
ev->port,
PORT_SOURCE_FD,
(
uintptr_t)event->object,
ev_convert2sys_events(event->events),
event))
{
perror(
"port_associate");
}
}
else if(event->finish) {
event->finish(h, event);
}
}
else if(event == &shutdown_event) {
ev_instance_close(h);
}
}
}
for(
int i=
0;i<numfinished;i++) {
Event *event = finished[i];
if(finished[i]->finish) {
finished[i]->finish(ev, event);
}
}
for(
int i=
0;i<ev->base.numret;i++) {
EVReturn ret = ev->base.fnreturn[i];
nsapi_saf_return(ret.sn, ret.rq, ret.ret);
}
}
free(ev);
}
int ev_convert2sys_events(
int events) {
int e =
0;
if((events &
EVENT_POLLIN) ==
EVENT_POLLIN) {
e |=
POLLIN;
}
if((events &
EVENT_POLLOUT) ==
EVENT_POLLOUT) {
e |=
POLLOUT;
}
return e;
}
int ev_pollin(EventHandler *h,
int fd, Event *event) {
EventHandlerSolaris *ev = (EventHandlerSolaris*)h;
event->object = (
intptr_t)fd;
event->events =
EVENT_POLLIN;
return port_associate(
ev->port,
PORT_SOURCE_FD,
(
uintptr_t)fd,
POLLIN,
event);
}
int ev_pollout(EventHandler *h,
int fd, Event *event) {
EventHandlerSolaris *ev = (EventHandlerSolaris*)h;
event->object = (
intptr_t)fd;
event->events =
EVENT_POLLOUT;
return port_associate(
ev->port,
PORT_SOURCE_FD,
(
uintptr_t)fd,
POLLOUT,
event);
}
int ev_remove_poll(EventHandler *h,
int fd) {
EventHandlerSolaris *ev = (EventHandlerSolaris*)h;
return port_dissociate(ev->port,
PORT_SOURCE_FD, (
uintptr_t)fd);
}
int event_send(EventHandler *h, Event *event) {
EventHandlerSolaris *ev = (EventHandlerSolaris*)h;
event->object =
0;
event->events =
0;
return port_send(ev->port,
0, event);
}
static int ev_aio(
int fd, aiocb_s *cb, WSBool read) {
EventHandlerSolaris *ev = cb->evhandler;
if(!ev) {
return -
1;
}
aiocb_t *aiocb = malloc(
sizeof(
aiocb_t));
if(!aiocb) {
return -
1;
}
ZERO(aiocb,
sizeof(
aiocb_t));
aiocb->aio_fildes = fd;
aiocb->aio_buf = cb->buf;
aiocb->aio_nbytes = cb->nbytes;
aiocb->aio_offset = cb->offset;
port_notify_t *portnotify = malloc(
sizeof(
port_notify_t));
if(!portnotify) {
free(aiocb);
return -
1;
}
portnotify->portnfy_port = ev->port;
portnotify->portnfy_user = cb->event;
aiocb->aio_sigevent.sigev_notify =
SIGEV_PORT;
aiocb->aio_sigevent.sigev_value.sival_ptr = portnotify;
if(read) {
return aio_read(aiocb);
}
else {
return aio_write(aiocb);
}
}
int ev_aioread(
int fd, aiocb_s *cb) {
return ev_aio(fd, cb,
TRUE);
}
int ev_aiowrite(
int fd, aiocb_s *cb) {
return ev_aio(fd, cb,
FALSE);
}
int event_pollin(EventHandler *ev,
SYS_NETFD fd, Event *event) {
return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_IN, event);
}
int event_pollout(EventHandler *ev,
SYS_NETFD fd, Event *event) {
return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_OUT, event);
}
int event_removepoll(EventHandler *ev,
SYS_NETFD fd) {
return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_NONE,
NULL);
}