#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include "../public/nsapi.h"
#include "sessionhandler.h"
#include "httprequest.h"
#include "httpparser.h"
#include "log.h"
#include "error.h"
#include "httplistener.h"
struct EventHttpIO {
HTTPRequest *request;
HttpParser *parser;
EVWatchList watch;
Event *io_event;
int error;
};
int connection_read(Connection *conn,
void *buf,
int len) {
return (
int)read(conn->fd, buf, len);
}
int connection_write(Connection *conn,
const void *buf,
int len) {
return (
int)write(conn->fd, buf, len);
}
void connection_close(Connection *conn) {
while(close(conn->fd)) {
if(errno !=
EINTR) {
log_ereport(
LOG_VERBOSE,
"connection close failed: %s", strerror(errno));
break;
}
log_ereport(
LOG_VERBOSE,
"connection close: EINTR");
}
}
int connection_ssl_read(Connection *conn,
void *buf,
int len) {
int ret = SSL_read(conn->ssl, buf, len);
if(ret <=
0) {
conn->ssl_error = SSL_get_error(conn->ssl, ret);
}
return ret;
}
int connection_ssl_write(Connection *conn,
const void *buf,
int len) {
int ret = SSL_write(conn->ssl, buf, len);
if(ret <=
0) {
conn->ssl_error = SSL_get_error(conn->ssl, ret);
}
return ret;
}
void connection_ssl_close(Connection *conn) {
if(!conn->ssl_error) {
int ret = SSL_shutdown(conn->ssl);
if(ret !=
1) {
conn->ssl_error = SSL_get_error(conn->ssl, ret);
log_ereport(
LOG_VERBOSE,
"SSL_shutdown failed: %d", conn->ssl_error);
}
}
while(close(conn->fd)) {
if(errno !=
EINTR) {
log_ereport(
LOG_VERBOSE,
"connection close failed: %s", strerror(errno));
break;
}
log_ereport(
LOG_VERBOSE,
"connection close: EINTR");
}
}
void connection_destroy(Connection *conn) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx connection_destroy", (
unsigned long long int)conn->id);
cfg_unref(conn->listener->cfg);
conn->close(conn);
if(conn->ssl) {
SSL_free(conn->ssl);
}
free(conn);
}
IOStream* create_connection_iostream(
SessionHandler *sh,
Connection *conn,
pool_handle_t *pool,
WSBool *ssl)
{
IOStream *io =
NULL;
if(conn->ssl) {
io = sslstream_new(pool, conn->ssl);
*ssl =
1;
}
else {
io = Sysstream_new(pool, conn->fd);
*ssl =
0;
}
return io;
}
SessionHandler* create_basic_session_handler(
pool_handle_t *pool) {
BasicSessionHandler *handler = pool_malloc(pool,
sizeof(BasicSessionHandler));
handler->threadpool = threadpool_new(
4,
8);
threadpool_start(handler->threadpool);
handler->sh.enqueue_connection = basic_enq_conn;
handler->sh.keep_alive = basic_keep_alive;
handler->sh.create_iostream = create_connection_iostream;
return (SessionHandler*)handler;
}
void basic_enq_conn(SessionHandler *handler, Connection *conn) {
BasicSessionHandler *sh = (BasicSessionHandler*)handler;
conn->session_handler = handler;
threadpool_run(sh->threadpool, basic_run_session, conn);
}
void* basic_run_session(
void *data) {
Connection *conn = (Connection*)data;
HTTPRequest *request = malloc(
sizeof(HTTPRequest));
http_request_init(request);
request->connection = conn;
netbuf *buf = malloc(
sizeof(netbuf));
buf->rdtimeout =
120;
buf->pos =
0;
buf->cursize =
0;
buf->maxsize =
2048;
buf->sd = &conn->fd;
buf->inbuf = malloc(
2048);
buf->errmsg =
NULL;
request->netbuf = buf;
HttpParser *parser = http_parser_new(request);
int state;
int r;
r = conn->read(conn, buf->inbuf + buf->pos, buf->maxsize - buf->pos);
if(r >
0) {
int err =
0;
buf->cursize += r;
while((state = http_parser_process(parser)) !=
0) {
if(state ==
2) {
log_ereport(
LOG_FAILURE,
"basic_run_session: invalid http request");
err =
1;
break;
}
r = conn->read(conn, buf->inbuf + buf->pos, buf->maxsize - buf->pos);
if(r == -
1) {
log_ereport(
LOG_FAILURE,
"basic_run_session: IO error: %s", strerror(errno));
err =
1;
break;
}
buf->cursize += r;
}
if(!err) {
if(http_parser_validate(parser)) {
r = handle_request(request,
NULL,
NULL);
}
else {
log_ereport(
LOG_FAILURE,
"basic_run_session: http parser validation failed");
fatal_error(request,
400);
}
}
}
else {
log_ereport(
LOG_FAILURE,
"basic_run_session: IO error: %s", strerror(errno));
}
free(buf->inbuf);
free(buf);
connection_destroy(conn);
http_parser_free(parser);
http_request_cleanup(request);
return NULL;
}
void basic_keep_alive(SessionHandler *handler, Connection *conn) {
connection_destroy(conn);
}
SessionHandler* create_event_session_handler(
pool_handle_t *pool) {
EventSessionHandler *handler = pool_malloc(pool,
sizeof(EventSessionHandler));
handler->eventhandler = get_default_event_handler();
handler->sh.enqueue_connection = evt_enq_conn;
handler->sh.keep_alive = evt_keep_alive;
handler->sh.create_iostream = create_connection_iostream;
return (SessionHandler*)handler;
}
void evt_request_timeout(EventHandler *h, EVWatchList *item) {
log_ereport(
LOG_VERBOSE,
"sessionhandler: request timeout");
item->intdata =
0;
EventHttpIO *io = item->data1;
io->error =
4;
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx request timeout", (
unsigned long long int)io->request->connection->id);
if(ev_remove_poll(h, io->request->connection->fd)) {
log_ereport(
LOG_FAILURE,
"sessionhandler: request timeout: cannot remove poll");
}
evt_request_error(h, io->io_event);
}
int evt_add_request(EventHandler *h, Event *event) {
EventHttpIO *io = event->cookie;
Connection *conn = io->request->connection;
if(ev_pollin(h, conn->fd, io->io_event) !=
0) {
log_ereport(
LOG_FAILURE,
"Cannot enqueue connection");
evt_request_error(h, event);
}
else {
io->watch.intdata =
1;
io->watch.created = time(
NULL);
io->watch.expire = io->watch.created +
240;
io->watch.destroy = evt_request_timeout;
io->watch.data1 = io;
ev_watchlist_add(h, &io->watch);
}
return 0;
}
void evt_enq_conn(SessionHandler *handler, Connection *conn) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx enqueue connection", (
unsigned long long int)conn->id);
Event *start_request = malloc(
sizeof(Event));
if(!start_request) {
connection_destroy(conn);
return;
}
Event *event = malloc(
sizeof(Event));
if(!event) {
connection_destroy(conn);
free(start_request);
return;
}
EventHttpIO *io = evt_req_init(handler, conn);
if(!io) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx connection destroyed | evt_req_init failed", (
unsigned long long int)conn->id);
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx free event", (
unsigned long long int)conn->id);
connection_destroy(conn);
free(start_request);
free(event);
return;
}
ZERO(event,
sizeof(Event));
event->fn = conn->ssl && !conn->ssl_accepted ? evt_request_ssl_accept : evt_request_input;
event->finish = evt_request_finish;
event->cookie = io;
io->io_event = event;
EventHandler *ev = ev_instance(((EventSessionHandler*)handler)->eventhandler);
ZERO(start_request,
sizeof(Event));
start_request->cookie = io;
start_request->fn = evt_add_request;
start_request->finish = ev_free_event;
start_request->error =
0;
if(event_send(ev, start_request)) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx connection destroyed | event_send failed", (
unsigned long long int)conn->id);
log_ereport(
LOG_FAILURE,
"Cannot start request timeout: event_send failed");
evt_request_error(ev, event);
free(start_request);
}
}
EventHttpIO* evt_req_init(SessionHandler *handler, Connection *conn) {
int flags;
if ((flags = fcntl(conn->fd,
F_GETFL,
0)) == -
1) {
flags =
0;
}
if (fcntl(conn->fd,
F_SETFL, flags |
O_NONBLOCK) !=
0) {
log_ereport(
LOG_FAILURE,
"sessionhandler: fcntl failed: %s", strerror(errno));
return NULL;
}
HTTPRequest *request = malloc(
sizeof(HTTPRequest));
if(!request) {
return NULL;
}
http_request_init(request);
request->connection = conn;
conn->session_handler = handler;
netbuf *buf = malloc(
sizeof(netbuf));
if(!buf) {
http_request_cleanup(request);
return NULL;
}
buf->rdtimeout =
120;
buf->pos =
0;
buf->cursize =
0;
buf->maxsize =
2048;
buf->sd = &conn->fd;
buf->errmsg =
NULL;
buf->inbuf = malloc(
2048);
if(!buf->inbuf) {
http_request_cleanup(request);
free(buf);
return NULL;
}
request->netbuf = buf;
HttpParser *parser = http_parser_new(request);
if(!parser) {
http_request_cleanup(request);
free(buf->inbuf);
free(buf);
return NULL;
}
EventHttpIO *io = malloc(
sizeof(EventHttpIO));
if(io ==
NULL) {
http_request_cleanup(request);
free(buf->inbuf);
free(buf);
http_parser_free(parser);
return NULL;
}
io->request = request;
io->parser = parser;
io->error =
0;
ZERO(&io->watch,
sizeof(EVWatchList));
return io;
}
int evt_request_ssl_accept(EventHandler *handler, Event *event) {
EventHttpIO *io = event->cookie;
Connection *conn = io->request->connection;
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx ssl accept", (
unsigned long long int)conn->id);
int ret = SSL_accept(conn->ssl);
if(ret <=
0) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx ssl accept | error", (
unsigned long long int)conn->id);
int error = SSL_get_error(conn->ssl, ret);
char *errstr;
switch(error) {
default: errstr =
"unknown";
break;
case SSL_ERROR_WANT_READ: {
event->events =
EVENT_POLLIN;
return 1;
}
case SSL_ERROR_WANT_WRITE: {
event->events =
EVENT_POLLOUT;
return 1;
}
case SSL_ERROR_ZERO_RETURN: errstr =
"SSL_ERROR_ZERO_RETURN";
break;
case SSL_ERROR_WANT_CONNECT: errstr =
"SSL_ERROR_WANT_CONNECT";
break;
case SSL_ERROR_WANT_ACCEPT: errstr =
"SSL_ERROR_WANT_ACCEPT";
break;
case SSL_ERROR_WANT_X509_LOOKUP: errstr =
"SSL_ERROR_WANT_X509_LOOKUP";
break;
case SSL_ERROR_SYSCALL: errstr =
"SSL_ERROR_SYSCALL";
break;
case SSL_ERROR_SSL: errstr =
"SSL_ERROR_SSL";
break;
}
log_ereport(
LOG_VERBOSE,
"SSL accept error[%d]: %s", error, errstr);
event->finish = evt_request_error;
io->error =
1;
return 0;
}
conn->ssl_accepted =
WS_TRUE;
event->fn = evt_request_input;
return evt_request_input(handler, event);
}
int evt_request_input(EventHandler *handler, Event *event) {
EventHttpIO *io = event->cookie;
HttpParser *parser = io->parser;
HTTPRequest *request = io->request;
Connection *conn = io->request->connection;
netbuf *buf = request->netbuf;
int state;
int r;
r = conn->read(
conn,
buf->inbuf + buf->pos,
buf->maxsize - buf->pos);
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx request input | r=%d", (
unsigned long long int)conn->id, r);
if(r <=
0) {
if(conn->ssl) {
switch(conn->ssl_error) {
case SSL_ERROR_WANT_READ: {
event->events =
EVENT_POLLIN;
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx request input | ssl want read", (
unsigned long long int)conn->id);
return 1;
}
case SSL_ERROR_WANT_WRITE: {
event->events =
EVENT_POLLOUT;
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx request input | ssl want write", (
unsigned long long int)conn->id);
return 1;
}
}
}
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx request input | error", (
unsigned long long int)conn->id);
event->finish = evt_request_error;
io->error =
1;
return 0;
}
buf->cursize += r;
state = http_parser_process(parser);
if(state ==
2) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx request input | http parser error", (
unsigned long long int)conn->id);
fatal_error(request,
400);
log_ereport(
LOG_VERBOSE,
"http parser: bad request");
event->finish = evt_request_error;
io->error =
2;
return 0;
}
else if(state ==
1) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx request input | event_pollin", (
unsigned long long int)conn->id);
event->events =
EVENT_POLLIN;
return 1;
}
int flags;
if (-
1 == (flags = fcntl(request->connection->fd,
F_GETFL,
0))) {
flags =
0;
}
if (fcntl(request->connection->fd,
F_SETFL, flags & ~
O_NONBLOCK) !=
0) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx request input | fcntl error", (
unsigned long long int)conn->id);
event->finish = evt_request_error;
io->error =
3;
return 0;
}
if(!http_parser_validate(parser)) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx request input | http parser validate error", (
unsigned long long int)conn->id);
log_ereport(
LOG_FAILURE,
"http_parser_validate failed");
fatal_error(request,
400);
event->finish = evt_request_error;
return 0;
}
return 0;
}
int evt_request_finish(EventHandler *h, Event *event) {
EventHttpIO *io = event->cookie;
HttpParser *parser = io->parser;
HTTPRequest *request = io->request;
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx request finish", (
unsigned long long int)request->connection->id);
uint64_t reqid = request->connection->id;
if(io->watch.intdata) {
ev_watchlist_remove(h, &io->watch);
}
int r = handle_request(request,
NULL, h);
if(r !=
0) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx request finish | handle_request failed", (
unsigned long long int)request->connection->id);
connection_destroy(request->connection);
free(request->netbuf->inbuf);
free(request->netbuf);
}
http_request_cleanup(request);
http_parser_free(parser);
free(io);
free(event);
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx free event", (
unsigned long long int)reqid);
return 0;
}
int evt_request_error(EventHandler *h, Event *event) {
EventHttpIO *io = event->cookie;
HttpParser *parser = io->parser;
HTTPRequest *request = io->request;
uint64_t reqid = request->connection->id;
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx request error", (
unsigned long long int)request->connection->id);
if(event->error) {
log_ereport(
LOG_VERBOSE,
"sessionhandler http io error: %d fd: %d", io->error, request->connection->fd);
}
if(io->watch.intdata) {
ev_watchlist_remove(h, &io->watch);
}
free(request->netbuf->inbuf);
free(request->netbuf);
connection_destroy(request->connection);
http_request_cleanup(request);
http_parser_free(parser);
free(io);
free(event);
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx free event", (
unsigned long long int)reqid);
return 0;
}
void evt_keep_alive(SessionHandler *handler, Connection *conn) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx keep alive", (
unsigned long long int)conn->id);
conn->id++;
Event *event = malloc(
sizeof(Event));
if(!event) {
connection_destroy(conn);
return;
}
ZERO(event,
sizeof(Event));
event->fn = evt_keep_alive_enqueue;
event->finish = ev_free_event;
event->cookie = conn;
EventHandler *ev = ev_instance(((EventSessionHandler*)handler)->eventhandler);
if(event_send(ev, event)) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx connection destroyed | keep_alive event error", (
unsigned long long int)conn->id);
log_ereport(
LOG_FAILURE,
"Keep-Alive: ev_send failed");
connection_destroy(conn);
free(event);
}
}
int evt_keep_alive_enqueue(EventHandler *h, Event *event) {
Connection *conn = event->cookie;
EVWatchList *keepalive = malloc(
sizeof(EVWatchList));
if(!keepalive) {
connection_destroy(conn);
return 0;
}
Event *ioevent = malloc(
sizeof(Event));
if(!ioevent) {
connection_destroy(conn);
free(keepalive);
return 0;
}
ZERO(keepalive,
sizeof(EVWatchList));
keepalive->data1 = conn;
keepalive->data2 = ioevent;
keepalive->destroy = evt_keep_alive_destroy;
keepalive->created = time(
NULL);
keepalive->expire = keepalive->created +
120;
ev_watchlist_add(h, keepalive);
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx keep_alive add watch", (
unsigned long long int)conn->id);
ZERO(ioevent,
sizeof(Event));
ioevent->fn = evt_keep_alive_input_event;
ioevent->finish = ev_free_event;
ioevent->cookie = keepalive;
if(ev_pollin(h, conn->fd, ioevent) !=
0) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx connection destroyed | keep_alive_enqueue", (
unsigned long long int)conn->id);
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx keep_alive free watch", (
unsigned long long int)conn->id);
log_ereport(
LOG_FAILURE,
"Cannot enqueue connection");
ev_watchlist_remove(h, keepalive);
connection_destroy(conn);
free(keepalive);
free(ioevent);
}
return 0;
}
int evt_keep_alive_input_event(EventHandler *h, Event *event) {
EVWatchList *keepalive = event->cookie;
Connection *conn = keepalive->data1;
ev_watchlist_remove(h, keepalive);
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx keep_alive free watch", (
unsigned long long int)conn->id);
free(keepalive);
EventHttpIO *io = evt_req_init(conn->session_handler, conn);
if(!io) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx connection destroyed | keep alive input", (
unsigned long long int)conn->id);
connection_destroy(conn);
return 0;
}
event->cookie = io;
event->fn = conn->ssl && !conn->ssl_accepted ? evt_request_ssl_accept : evt_request_input;
event->finish = evt_request_finish;
return event->fn(h, event);
}
void evt_keep_alive_destroy(EventHandler *h, EVWatchList *item) {
Connection *conn = item->data1;
Event *ioevent = item->data2;
log_ereport(
LOG_DEBUG,
"sessionhandler: keep-alive timeout: close connection");
if(ev_remove_poll(h, conn->fd)) {
log_ereport(
LOG_FAILURE,
"sessionhandler: keep-alive timeout: cannot remove poll");
}
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx keep_alive timeout", (
unsigned long long int)conn->id);
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx keep_alive free watch", (
unsigned long long int)conn->id);
connection_destroy(conn);
free(ioevent);
free(item);
}