#include "../public/nsapi.h"
#include <stdio.h>
#include <stdlib.h>
#include <sys/shm.h>
#include <sys/ipc.h>
#include <sys/file.h>
#include <netinet/tcp.h>
#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <strings.h>
#include <stdbool.h>
#include <signal.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <openssl/rand.h>
#include <cx/hash_map.h>
#include "../util/atomic.h"
#include "httplistener.h"
#include "netsite.h"
#include "session.h"
#include "configmanager.h"
#include "log.h"
#define LISTENER_MAX_PROTOCOL_TOKENS 1024
#define LISTENER_PROTO_IPV4 "ipv4"
#define LISTENER_PROTO_IPV6 "ipv6"
static CxMap *listener_socket_map;
static pthread_mutex_t listener_mutex =
PTHREAD_MUTEX_INITIALIZER;
int http_listener_global_init(
void) {
listener_socket_map = cxHashMapCreate(cxDefaultAllocator,
CX_STORE_POINTERS,
4);
if(!listener_socket_map) {
return 1;
}
return 0;
}
int start_all_listener() {
ServerConfiguration *conf = cfgmgr_get_server_config();
CxList *ls = conf->listeners;
CxIterator iter = cxListIterator(ls);
cx_foreach(HttpListener *, listener, iter) {
http_listener_start(listener);
}
return 0;
}
static HttpSSL* create_http_ssl(ListenerConfig *conf) {
SSL_CTX *ctx = SSL_CTX_new(SSLv23_server_method());
if(!ctx) {
return NULL;
}
SSL_CTX_set_options(
ctx,
SSL_OP_SINGLE_DH_USE | SSL_OP_NO_SSLv3);
SSL_CTX_set_mode(ctx,
SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER);
SSL_CTX_set_mode(ctx,
SSL_MODE_ENABLE_PARTIAL_WRITE);
int error =
0;
if(conf->disable_proto.ptr) {
cxstring *plist =
NULL;
ssize_t n = cx_strsplit_a(cxDefaultAllocator, conf->disable_proto, cx_str(
","),
LISTENER_MAX_PROTOCOL_TOKENS, &plist);
if(plist) {
for(
int i=
0;i<n;i++) {
cxstring proto = plist[i];
log_ereport(
LOG_VERBOSE,
"Listener %s: Disable protocol %s",
conf->name.ptr,
proto.ptr);
if(!cx_strcasecmp(cx_strtrim(proto), cx_str(
"SSLv2"))) {
SSL_CTX_set_options(ctx, SSL_OP_NO_SSLv2);
}
else if(!cx_strcasecmp(cx_strtrim(proto), cx_str(
"SSLv3"))) {
SSL_CTX_set_options(ctx, SSL_OP_NO_SSLv3);
}
else if(!cx_strcasecmp(cx_strtrim(proto), cx_str(
"TLSv1"))) {
SSL_CTX_set_options(ctx, SSL_OP_NO_TLSv1);
}
else if(!cx_strcasecmp(cx_strtrim(proto), cx_str(
"TLSv1.1"))) {
#ifdef SSL_OP_NO_TLSv1_1
SSL_CTX_set_options(ctx, SSL_OP_NO_TLSv1_1);
#else
log_ereport(
LOG_WARN,
"Listener: %s: TLSv1.1 not supported",
conf->name.ptr);
#endif
}
else if(cx_strcasecmp(cx_strtrim(proto), cx_str(
"TLSv1.2"))) {
#ifdef SSL_OP_NO_TLSv1_2
SSL_CTX_set_options(ctx, SSL_OP_NO_TLSv1_2);
#else
log_ereport(
LOG_WARN,
"Listener: %s: TLSv1.2 not supported",
conf->name.ptr);
#endif
}
else if(cx_strcasecmp(cx_strtrim(proto), cx_str(
"TLSv1.3"))) {
#ifdef SSL_OP_NO_TLSv1_3
SSL_CTX_set_options(ctx, SSL_OP_NO_TLSv1_3);
#else
log_ereport(
LOG_WARN,
"Listener: %s: TLSv1.3 not supported",
conf->name.ptr);
#endif
}
else {
error =
1;
log_ereport(
LOG_MISCONFIG,
"Listener: %s: Unknown protocol %s",
conf->name.ptr,
proto.ptr);
}
}
free(plist);
}
}
if(error) {
SSL_CTX_free(ctx);
return NULL;
}
int ret;
char errbuf[
512];
error =
0;
if(conf->chainfile.ptr) {
ret = SSL_CTX_use_certificate_chain_file(ctx, conf->chainfile.ptr);
if(!ret) {
ERR_error_string(ERR_get_error(), errbuf);
log_ereport(
LOG_MISCONFIG,
"Cannot load ssl cert file: %s", errbuf);
error =
1;
}
}
else if(conf->certfile.ptr) {
ret = SSL_CTX_use_certificate_file(ctx, conf->certfile.ptr,
SSL_FILETYPE_PEM);
if(!ret) {
ERR_error_string(ERR_get_error(), errbuf);
log_ereport(
LOG_MISCONFIG,
"Cannot load ssl chain file: %s", errbuf);
error =
1;
}
}
else {
log_ereport(
LOG_MISCONFIG,
"Listener %s: no CertChain/Cert specified", conf->name.ptr);
error =
1;
}
ret = SSL_CTX_use_PrivateKey_file(ctx, conf->privkeyfile.ptr,
SSL_FILETYPE_PEM);
if(!ret) {
ERR_error_string(ERR_get_error(), errbuf);
log_ereport(
LOG_MISCONFIG,
"Cannot load ssl key file: %s", errbuf);
error =
1;
}
if(error) {
SSL_CTX_free(ctx);
return NULL;
}
HttpSSL *ssl = pool_malloc(conf->cfg->pool,
sizeof(HttpSSL));
if(!ssl) {
SSL_CTX_free(ctx);
return NULL;
}
ZERO(ssl,
sizeof(HttpSSL));
ssl->sslctx = ctx;
return ssl;
}
static WSSocket* create_socket(ListenerConfig *conf,
const char *protocol) {
WSBool ipv4 = !strcmp(protocol,
"ipv4") ?
TRUE :
FALSE;
int s = -
1;
if(ipv4) {
s = socket(
AF_INET,
SOCK_STREAM,
0);
}
else {
s = socket(
AF_INET6,
SOCK_STREAM,
IPPROTO_TCP);
}
if(s <
0) {
log_ereport(
LOG_FAILURE,
"cannot create socket: protocol: %s port: %d error: %s",
protocol,
conf->port,
strerror(errno));
return NULL;
}
int o =
1;
setsockopt(
s,
SOL_SOCKET,
SO_REUSEADDR,
&o,
sizeof(
int));
#ifdef LINUX
if(!ipv4) {
o =
1;
setsockopt(
s,
IPPROTO_IPV6,
IPV6_V6ONLY,
&o,
sizeof(
int));
}
#endif
union ws_socketaddr addr;
struct sockaddr *servaddr;
size_t servaddr_size;
if(ipv4) {
memset(&addr.addr4,
0,
sizeof(addr.addr4));
addr.addr4.sin_family =
AF_INET;
addr.addr4.sin_addr.s_addr = htonl(
INADDR_ANY);
addr.addr4.sin_port = htons(conf->port);
servaddr = (
struct sockaddr *)&addr.addr4;
servaddr_size =
sizeof(addr.addr4);
}
else {
memset(&addr.addr6,
0,
sizeof(addr.addr6));
addr.addr6.sin6_family =
AF_INET6;
addr.addr6.sin6_addr = in6addr_any;
addr.addr6.sin6_port = htons(conf->port);
servaddr = (
struct sockaddr *)&addr.addr6;
servaddr_size =
sizeof(addr.addr6);
}
if(bind(s, servaddr, servaddr_size)) {
log_ereport(
LOG_FAILURE,
"cannot bind socket: protocol: %s port: %d error: %s",
protocol,
conf->port,
strerror(errno));
close(s);
return NULL;
}
WSSocket *wssocket = malloc(
sizeof(WSSocket));
if(!wssocket) {
close(s);
return NULL;
}
ZERO(wssocket,
sizeof(WSSocket));
wssocket->socket = s;
wssocket->addr = addr;
if(ipv4) {
wssocket->sockaddr = (
struct sockaddr *)&wssocket->addr.addr4;
}
else {
wssocket->sockaddr = (
struct sockaddr *)&wssocket->addr.addr6;
}
wssocket->sockaddr_size = servaddr_size;
return wssocket;
}
static WSSocket* get_socket(ListenerConfig *conf,
const char *protocol) {
char key_data[
32];
size_t key_len = snprintf(key_data,
32,
"%s:%d", protocol, conf->port);
CxHashKey key = cx_hash_key(key_data, key_len);
WSSocket *sock = cxMapGet(listener_socket_map, key);
if(!sock) {
sock = create_socket(conf, protocol);
if(sock) {
cxMapPut(listener_socket_map, key, sock);
}
}
return sock;
}
static HttpSSL* socket_get_ssl(WSSocket *socket1, WSSocket *socket2) {
if(socket1 && socket1->ssl) {
return socket1->ssl;
}
if(socket2 && socket2->ssl) {
return socket2->ssl;
}
return NULL;
}
static HttpListener* listener_create(ListenerConfig *conf) {
pool_handle_t *pool = conf->cfg->pool;
HttpListener *listener = pool_malloc(pool,
sizeof(HttpListener));
if(!listener) {
return NULL;
}
ZERO(listener,
sizeof(HttpListener));
listener->running =
0;
listener->cfg = conf->cfg;
listener->name = cx_strdup_a(pool_allocator(pool), cx_strcast(conf->name));
listener->default_vs.vs_name = pool_strdup(pool, conf->vs.ptr);
listener->threadpool =
NULL;
if(conf->threadpool.ptr !=
NULL) {
listener->threadpool = get_threadpool(cx_strcast(conf->threadpool));
}
if(listener->threadpool ==
NULL) {
listener->threadpool = get_default_threadpool();
}
if(conf->blockingio) {
listener->session_handler = create_basic_session_handler(pool);
}
else {
listener->session_handler = create_event_session_handler(pool);
}
listener->nacceptors = conf->nacceptors;
listener->port = conf->port;
listener->next =
NULL;
listener->server_socket = get_socket(conf,
LISTENER_PROTO_IPV4);
listener->server_socket6 = get_socket(conf,
LISTENER_PROTO_IPV6);
if(!listener->server_socket && !listener->server_socket6) {
log_ereport(
LOG_FAILURE,
"Listener %s: no server socket", conf->name.ptr);
return NULL;
}
if(listener->server_socket) {
wssocket_ref(listener->server_socket);
}
if(listener->server_socket6) {
wssocket_ref(listener->server_socket6);
}
if(conf->ssl) {
HttpSSL *ssl = socket_get_ssl(listener->server_socket, listener->server_socket6);
if(!ssl) {
ssl = create_http_ssl(conf);
if(!ssl) {
log_ereport(
LOG_FAILURE,
"Listener %s: cannot create SSL context", conf->name.ptr);
return NULL;
}
}
if(listener->server_socket) {
listener->server_socket->ssl = ssl;
}
if(listener->server_socket6) {
listener->server_socket6->ssl = ssl;
}
}
listener->acceptors = calloc(listener->nacceptors,
sizeof(
void*));
listener->acceptors6 = calloc(listener->nacceptors,
sizeof(
void*));
for (
int i=
0;i<listener->nacceptors;i++) {
listener->acceptors[i] = acceptor_new(listener);
listener->acceptors6[i] = acceptor_new(listener);
listener->acceptors6[i]->ipv6 =
TRUE;
}
return listener;
}
HttpListener* http_listener_create(ListenerConfig *conf) {
pthread_mutex_lock(&listener_mutex);
HttpListener *listener = listener_create(conf);
pthread_mutex_unlock(&listener_mutex);
return listener;
}
void http_listener_destroy(HttpListener *listener) {
log_ereport(
LOG_DEBUG,
"destroy http listener: %s config: %p", listener->name.ptr, listener->cfg);
if(listener->shutdown) {
pthread_mutex_destroy(&listener->shutdown_mutex);
pthread_cond_destroy(&listener->shutdown_cond);
}
}
int http_listener_start(HttpListener *listener) {
if(listener->running) {
return 0;
}
log_ereport(
LOG_INFORM,
"start listener on port %d", listener->port);
WSBool ipv4 = listener->server_socket ?
TRUE :
FALSE;
WSBool ipv6 = listener->server_socket6 ?
TRUE:
FALSE;
if (ipv4 && !listener->server_socket->listening && listen(listener->server_socket->socket,
256) == -
1) {
log_ereport(
LOG_FAILURE,
"http_listener_start: listen failed: %s", strerror(errno));
return -
1;
}
else {
listener->server_socket->listening =
TRUE;
}
if (ipv6 && !listener->server_socket6->listening && listen(listener->server_socket6->socket,
256) == -
1) {
log_ereport(
LOG_FAILURE,
"http_listener_start: listen v6 failed: %s", strerror(errno));
return -
1;
}
else {
listener->server_socket6->listening =
TRUE;
}
for (
int i=
0;i<listener->nacceptors;i++) {
if(ipv4) {
acceptor_start(listener->acceptors[i]);
}
if(ipv6) {
acceptor_start(listener->acceptors6[i]);
}
}
listener->running =
TRUE;
return 0;
}
int http_listener_socket_eq(HttpListener *l1, HttpListener *l2) {
if(l1->server_socket && l2->server_socket && l1->server_socket == l2->server_socket) {
return TRUE;
}
if(l1->server_socket6 && l2->server_socket6 && l1->server_socket6 == l2->server_socket6) {
return TRUE;
}
return FALSE;
}
void http_listener_set_next(HttpListener *listener, HttpListener *next) {
while(listener->next) {
listener = listener->next;
}
listener->next = next;
}
int http_listener_connect(HttpListener *listener, WSBool ipv6) {
int domain = ipv6 ?
AF_INET6 :
AF_INET;
int client = socket(domain,
SOCK_STREAM,
0);
if(client <
0) {
return -
1;
}
struct sockaddr *sockaddr;
size_t sockaddr_size;
if(ipv6) {
sockaddr = listener->server_socket6->sockaddr;
sockaddr_size = listener->server_socket6->sockaddr_size;
}
else {
sockaddr = listener->server_socket->sockaddr;
sockaddr_size = listener->server_socket->sockaddr_size;
}
if(connect(client, sockaddr, sockaddr_size) <
0) {
close(client);
return -
1;
}
return client;
}
void http_listener_shutdown_acceptors(HttpListener *listener) {
if(pthread_mutex_init(&listener->shutdown_mutex,
NULL)) {
log_ereport(
LOG_FAILURE,
"http_listener_shutdown_acceptors: pthread_mutex_init failed: %s", strerror(errno));
return;
}
if(pthread_cond_init(&listener->shutdown_cond,
NULL)) {
log_ereport(
LOG_FAILURE,
"http_listener_shutdown_acceptors: pthread_cond_init failed: %s", strerror(errno));
return;
}
listener->shutdown =
TRUE;
log_ereport(
LOG_INFORM,
"shutdown http listener %s", listener->name.ptr);
pthread_mutex_lock(&listener->shutdown_mutex);
for(
int i=
0;i<listener->nacceptors;i++) {
listener->acceptors[i]->exit =
TRUE;
int client4 = http_listener_connect(listener,
FALSE);
if(client4 <
0) {
log_ereport(
LOG_FAILURE,
"http_listener_shutdown_acceptors: cannot connect to ipv4 server socket: %s", strerror(errno));
}
else {
close(client4);
}
listener->acceptors6[i]->exit =
TRUE;
int client6 = http_listener_connect(listener,
TRUE);
if(client6 <
0) {
log_ereport(
LOG_FAILURE,
"http_listener_shutdown_acceptors: cannot connect to ipv6 server socket: %s", strerror(errno));
}
else {
close(client6);
}
}
struct timespec ts;
ts.tv_sec = time(
NULL) +
60;
pthread_cond_timedwait(&listener->shutdown_cond, &listener->shutdown_mutex, &ts);
pthread_mutex_unlock(&listener->shutdown_mutex);
}
Acceptor* acceptor_new(HttpListener *listener) {
Acceptor *acceptor = malloc(
sizeof(Acceptor));
acceptor->listener = listener;
acceptor->ipv6 =
WS_FALSE;
acceptor->exit =
WS_FALSE;
return acceptor;
}
void acceptor_start(Acceptor *a) {
cfg_ref(a->listener->cfg);
if(pthread_create(
&a->tid,
NULL,
(
void*(*)(
void*))acceptor_thread,
a) !=
0)
{
log_ereport(
LOG_FAILURE,
"Listener %s: acceptor_start: %s acceptor", a->listener->name.ptr, strerror(errno));
cfg_unref(a->listener->cfg);
}
else {
(
void)pthread_detach(a->tid);
}
}
void* acceptor_thread(Acceptor *acceptor) {
WS_ASSERT(acceptor);
WS_ASSERT(acceptor->listener);
WS_ASSERT(acceptor->listener->cfg);
WS_ASSERT(acceptor->listener->session_handler);
WS_ASSERT(acceptor->listener->session_handler->enqueue_connection);
acceptor->running =
TRUE;
HttpListener *listener = acceptor->listener;
int server_socket;
uint32_t *acceptors_running = &listener->nacceptors_running;
ConnectionAddr ca;
struct sockaddr *ca_ptr;
socklen_t ca_length;
ConnectionAddrType addr_type;
HttpSSL *socket_ssl =
NULL;
if(acceptor->ipv6) {
server_socket = listener->server_socket6->socket;
ca_ptr = (
struct sockaddr*)&ca.address_v6;
ca_length =
sizeof(ca.address_v6);
addr_type =
CONN_ADDR_IPV6;
socket_ssl = listener->server_socket6->ssl;
}
else {
server_socket = listener->server_socket->socket;
ca_ptr = (
struct sockaddr*)&ca.address_v4;
ca_length =
sizeof(ca.address_v4);
addr_type =
CONN_ADDR_IPV4;
socket_ssl = listener->server_socket->ssl;
}
log_ereport(
LOG_DEBUG,
"acceptor: %p listener: %p start", acceptor, acceptor->listener);
ws_atomic_inc32(acceptors_running);
for (;;) {
int clientfd;
socklen_t length = ca_length;
clientfd = accept(
server_socket,
ca_ptr,
&length);
log_ereport(
LOG_DEBUG,
"acceptor: %p listener: %p: accept(): %d", acceptor, acceptor->listener, clientfd);
if (clientfd == -
1) {
log_ereport(
LOG_FAILURE,
"accept %s failed: %s", acceptor->ipv6 ?
"ipv6" :
"ipv4", strerror(errno));
if(acceptor->exit) {
log_ereport(
LOG_VERBOSE,
"acceptor thread %p: listener: %p exit", acceptor, acceptor->listener);
break;
}
continue;
}
HttpListener *ls = listener;
int acceptor_exit =
0;
while(ls->next) {
ls = ls->next;
acceptor_exit =
1;
}
Connection *conn = malloc(
sizeof(Connection));
uint32_t conn_id;
RAND_bytes((
unsigned char*)&conn->id,
sizeof(conn->id));
conn->id <<=
16;
conn->address = ca;
conn->addr_type = addr_type;
conn->fd = clientfd;
conn->listener = ls;
conn->ssl_accepted =
0;
conn->ssl_error =
0;
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx connection accepted", (
unsigned long long int)conn->id);
if(socket_ssl) {
int flags;
if((flags = fcntl(conn->fd,
F_GETFL,
0)) == -
1) {
flags =
0;
}
if(fcntl(conn->fd,
F_SETFL, flags |
O_NONBLOCK)) {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx connection destroyed | fcntl error", (
unsigned long long int)conn->id);
log_ereport(
LOG_FAILURE,
"acceptor: fcntl failed: %s", strerror(errno));
close(clientfd);
free(conn);
conn =
NULL;
}
else {
SSL *ssl = SSL_new(socket_ssl->sslctx);
if(ssl) {
SSL_set_fd(ssl, clientfd);
conn->ssl = ssl;
conn->read = connection_ssl_read;
conn->write = connection_ssl_write;
conn->close = connection_ssl_close;
}
else {
log_ereport(
LOG_DEBUG,
"trace reqid: %016llx connection destroyed | SSL init error", (
unsigned long long int)conn->id);
log_ereport(
LOG_FAILURE,
"acceptor: %p listener: %p SSL_new() failed", acceptor, acceptor->listener);
free(conn);
close(clientfd);
conn =
NULL;
}
}
}
else {
conn->ssl =
NULL;
conn->read = connection_read;
conn->write = connection_write;
conn->close = connection_close;
}
if(conn) {
cfg_ref(ls->cfg);
ls->session_handler->enqueue_connection(
ls->session_handler,
conn);
}
if(acceptor_exit || acceptor->exit) {
log_ereport(
LOG_VERBOSE,
"acceptor thread %p: listener: %p exit", acceptor, acceptor->listener);
break;
}
}
if(ws_atomic_dec32(acceptors_running) ==
0) {
if(listener->shutdown) {
log_ereport(
LOG_DEBUG,
"last acceptor shutdown: notify cfgmgr");
pthread_mutex_lock(&listener->shutdown_mutex);
pthread_cond_signal(&listener->shutdown_cond);
pthread_mutex_unlock(&listener->shutdown_mutex);
}
}
acceptor->running =
FALSE;
cfg_unref(acceptor->listener->cfg);
return NULL;
}
void wssocket_ref(WSSocket *ws) {
ws_atomic_inc32(&ws->ref);
}
void wssocket_unref(WSSocket *ws) {
ws_atomic_dec32(&ws->ref);
}
int http_listener_apply_keep_alive_settings(HttpListener *listener,
int fd) {
int optval =
1;
if (setsockopt(fd,
SOL_SOCKET,
SO_KEEPALIVE, &optval,
sizeof(optval))) {
log_ereport(
LOG_FAILURE,
"listener: cannot set SO_KEEPALIVE: %s", strerror(errno));
return 1;
}
int keepidle =
240;
if (setsockopt(fd,
IPPROTO_TCP,
TCP_KEEPIDLE, &keepidle,
sizeof(keepidle))) {
log_ereport(
LOG_FAILURE,
"listener: cannot set TCP_KEEPIDLE to value %d: %s", keepidle, strerror(errno));
return 1;
}
int keepintvl =
10;
if (setsockopt(fd,
IPPROTO_TCP,
TCP_KEEPINTVL, &keepintvl,
sizeof(keepintvl))) {
log_ereport(
LOG_FAILURE,
"listener: cannot set TCP_KEEPINTVL to value %d: %s", keepintvl, strerror(errno));
return 1;
}
int keepcnt =
3;
if (setsockopt(fd,
IPPROTO_TCP,
TCP_KEEPCNT, &keepcnt,
sizeof(keepcnt))) {
log_ereport(
LOG_FAILURE,
"listener: cannot set TCP_KEEPCNT to value %d: %s", keepcnt, strerror(errno));
return 1;
}
return 0;
}