src/server/daemon/sessionhandler.c

Sat, 13 Jan 2018 18:48:19 +0100

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Sat, 13 Jan 2018 18:48:19 +0100
branch
aio
changeset 192
6a145e13d933
parent 191
391ccd490d97
child 193
aa8393527b1e
permissions
-rw-r--r--

replaces eventfd with pipe and closes aio branch

/*
 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
 *
 * Copyright 2013 Olaf Wintermann. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are met:
 *
 *   1. Redistributions of source code must retain the above copyright
 *      notice, this list of conditions and the following disclaimer.
 *
 *   2. Redistributions in binary form must reproduce the above copyright
 *      notice, this list of conditions and the following disclaimer in the
 *      documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
 * POSSIBILITY OF SUCH DAMAGE.
 */

#include <stdio.h>
#include <stdlib.h>
#include <errno.h>

#include "../public/nsapi.h"

#include "sessionhandler.h"
#include "httprequest.h"
#include "httpparser.h"
#include "log.h"
#include "error.h"
#include "httplistener.h"

typedef struct _event_http_io {
    HTTPRequest  *request;
    HttpParser   *parser;
    int          error;
} EventHttpIO;


int connection_read(Connection *conn, void *buf, int len) {
    return (int)read(conn->fd, buf, len);
}

int connection_write(Connection *conn, const void *buf, int len) {
    return (int)write(conn->fd, buf, len);
}

void connection_close(Connection *conn) {
    close(conn->fd);
}

int connection_ssl_read(Connection *conn, void *buf, int len) {
    int ret = SSL_read(conn->ssl, buf, len);
    if(ret <= 0) {
        conn->ssl_error = SSL_get_error(conn->ssl, ret);
    }
    return ret;
}

int connection_ssl_write(Connection *conn, const void *buf, int len) {
    int ret = SSL_write(conn->ssl, buf, len);
    if(ret <= 0) {
        conn->ssl_error = SSL_get_error(conn->ssl, ret);
    }
    return ret;
}

void connection_ssl_close(Connection *conn) {
    if(!conn->ssl_error) {
        int ret = SSL_shutdown(conn->ssl);
        if(ret != 1) {
            conn->ssl_error = SSL_get_error(conn->ssl, ret);
            log_ereport(LOG_VERBOSE, "SSL_shutdown failed: %d", conn->ssl_error);
        }
    }
    close(conn->fd);
}

void connection_destroy(Connection *conn) {
    conn->close(conn);
    if(conn->ssl) {
        SSL_free(conn->ssl);
    }
    free(conn);    
}

IOStream* create_connection_iostream(
        SessionHandler *sh,
        Connection *conn,
        pool_handle_t *pool,
        WSBool *ssl)
{
    IOStream *io = NULL;
    if(conn->ssl) {
        io = sslstream_new(pool, conn->ssl);
        *ssl = 1;
    } else {
        io = sysstream_new(pool, conn->fd);
        *ssl = 0;
    }
    return io;
}


SessionHandler* create_basic_session_handler() {
    BasicSessionHandler *handler = malloc(sizeof(BasicSessionHandler));
    handler->threadpool = threadpool_new(4, 8);
    handler->sh.enqueue_connection = basic_enq_conn;
    handler->sh.keep_alive = basic_keep_alive;
    handler->sh.create_iostream = create_connection_iostream;

    return (SessionHandler*)handler;
}

void basic_enq_conn(SessionHandler *handler, Connection *conn) {
    BasicSessionHandler *sh = (BasicSessionHandler*)handler;
    conn->session_handler = handler;
    threadpool_run(sh->threadpool, basic_run_session, conn);
}

void* basic_run_session(void *data) {
    Connection *conn = (Connection*)data;

    HTTPRequest request;
    http_request_init(&request);
    request.connection = conn;

    // read request
    netbuf *buf = malloc(sizeof(netbuf));
    buf->rdtimeout = 120;
    buf->pos = 0;
    buf->cursize = 0;
    buf->maxsize = 2048;
    buf->sd = &conn->fd;
    buf->inbuf = malloc(2048);
    buf->errmsg = NULL;

    request.netbuf = buf;

    HttpParser *parser = http_parser_new(&request);
    int state;
    int r;
    r = conn->read(conn, buf->inbuf + buf->pos, buf->maxsize - buf->pos);
    if(r <= 0) {
         // TODO: error handling
        fprintf(stderr, "%s\n", "Error: Cannot read from socket");
        return NULL;
    }
    buf->cursize += r;
    while((state = http_parser_process(parser)) != 0) {
        if(state == 2) {
            // TODO: error handling
            fprintf(stderr, "%s\n", "Error: Cannot parse http request");
            return NULL;
        }
        r = conn->read(conn, buf->inbuf + buf->pos, buf->maxsize - buf->pos);
        if(r == -1) {
            // TODO: error handling
            fprintf(stderr, "%s\n", "Error: Cannot read from socket");
            return NULL;
        }
        buf->cursize += r;
    }
    
    // process request
    r = handle_request(&request, NULL, NULL); // TODO: use correct thread pool
    
    // TODO: free, see evt_request_finish

    return NULL;
}

void basic_keep_alive(SessionHandler *handler, Connection *conn) {
    connection_destroy(conn);
}


/* ----- event session handler ----- */

SessionHandler* create_event_session_handler() {
    EventSessionHandler *handler = malloc(sizeof(EventSessionHandler));
    handler->eventhandler = get_default_event_handler();
    handler->sh.enqueue_connection = evt_enq_conn;
    handler->sh.keep_alive = evt_keep_alive;
    handler->sh.create_iostream = create_connection_iostream;
    return (SessionHandler*)handler;
}

void evt_enq_conn(SessionHandler *handler, Connection *conn) {
    HTTPRequest *request = malloc(sizeof(HTTPRequest));
    http_request_init(request);
    request->connection = conn;
    conn->session_handler = handler;
    
    // set socket non blocking
    int flags;
    if ((flags = fcntl(conn->fd, F_GETFL, 0)) == -1) {
        flags = 0;
    }
    if (fcntl(conn->fd, F_SETFL, flags | O_NONBLOCK) != 0) {
        perror("Error: start_event_session: fcntl");
        // TODO: error
    }
    
    // TODO: remove code redundancy (basic_run_session)
    
    // read request
    netbuf *buf = malloc(sizeof(netbuf));
    buf->rdtimeout = 120;
    buf->pos = 0;
    buf->cursize = 0;
    buf->maxsize = 2048;
    buf->sd = &conn->fd;
    buf->inbuf = malloc(2048);
    buf->errmsg = NULL;

    request->netbuf = buf;
    
    HttpParser *parser = http_parser_new(request);
    
    EventHttpIO *io = malloc(sizeof(EventHttpIO));
    if(io == NULL) {
        // TODO: error handling
    }
    io->request = request;
    io->parser  = parser;
    io->error = 0;
    
    /*
     * to start the request handling, we begin with a poll on the socket,
     * 
     * evt_enq_conn() --> event handler --> handle_request()
     */
    
    Event *event = malloc(sizeof(Event));
    ZERO(event, sizeof(Event));
    event->fn = conn->ssl ? evt_request_ssl_accept : evt_request_input;
    event->finish = evt_request_finish;
    event->cookie = io;
    
    EventHandler *ev = ev_instance(((EventSessionHandler*)handler)->eventhandler);
    
    if(ev_pollin(ev, conn->fd, event) != 0) {
        // TODO: ev_pollin should log, intercept some errors here
        log_ereport(LOG_FAILURE, "Cannot enqueue connection");
        connection_destroy(conn);
        // TODO: free stuff
    }
}

int evt_request_ssl_accept(EventHandler *handler, Event *event) {
    EventHttpIO *io = event->cookie;
    Connection  *conn = io->request->connection;
    
    int ret = SSL_accept(conn->ssl);
    if(ret <= 0) {
        int error = SSL_get_error(conn->ssl, ret);
        char *errstr;
        switch(error) {
            default: errstr = "unknown"; break;
            case SSL_ERROR_WANT_READ: {
                event->events = EVENT_POLLIN;
                return 1;
            }
            case SSL_ERROR_WANT_WRITE: {
                event->events = EVENT_POLLOUT;
                return 1;
            }
            case SSL_ERROR_ZERO_RETURN: errstr = "SSL_ERROR_ZERO_RETURN"; break;
            case SSL_ERROR_WANT_CONNECT: errstr = "SSL_ERROR_WANT_CONNECT"; break;
            case SSL_ERROR_WANT_ACCEPT: errstr = "SSL_ERROR_WANT_ACCEPT"; break;
            case SSL_ERROR_WANT_X509_LOOKUP: errstr = "SSL_ERROR_WANT_X509_LOOKUP"; break;
            case SSL_ERROR_SYSCALL: errstr = "SSL_ERROR_SYSCALL"; break;
            case SSL_ERROR_SSL: errstr = "SL_ERROR_SSL"; break;
            
            log_ereport(LOG_VERBOSE, "SSL accept error[%d]: %s", error, errstr);
            event->finish = evt_request_error;
            io->error = 1;
            return 0;
        }
    }
    
    // SSL_accept successful, start request input now
    event->fn = evt_request_input;
    return evt_request_input(handler, event);
}

int evt_request_input(EventHandler *handler, Event *event) {    
    EventHttpIO *io = event->cookie;
    HttpParser  *parser  = io->parser;
    HTTPRequest *request = io->request;
    Connection  *conn = io->request->connection;
    netbuf      *buf     = request->netbuf;
    
    int state;
    int r;
    r = conn->read(
            conn,
            buf->inbuf + buf->pos,
            buf->maxsize - buf->pos);
    if(r <= 0) {
        if(conn->ssl) {
            // SSL specific error handling
            switch(conn->ssl_error) {
                case SSL_ERROR_WANT_READ: {
                    event->events = EVENT_POLLIN;
                    return 1;
                }
                case SSL_ERROR_WANT_WRITE: {
                    event->events = EVENT_POLLOUT;
                    return 1;
                }
            }
        }
        
        event->finish = evt_request_error;
        io->error = 1;
        return 0;
    }
    //fwrite(buf->inbuf + buf->pos, 1, r, stdout);
    //printf("\n");
    
    buf->cursize += r;
    state = http_parser_process(parser);
    if(state == 2) {
        // parse error
        fatal_error(request, 400);
        event->finish = evt_request_error;
        io->error = 2;
        return 0;
    } else if(state == 1) {
        /*
         * we need more data -> return 1 to tell the event handler to
         * continue polling
         */
        event->events = EVENT_POLLIN;
        return 1;
    }
    
    // we are done with reading
    
    // set socket blocking
    int flags;
    if (-1 == (flags = fcntl(request->connection->fd, F_GETFL, 0))) {
        flags = 0;
    }
    if (fcntl(request->connection->fd, F_SETFL, flags & ~O_NONBLOCK) != 0) {
        // just close the connection if fcntl fails
        event->finish = evt_request_error;
        io->error = 3;
        return 0;
    }
     
    /*
     * process request
     * 
     * We return 0 to finish request input. The event handler than stops
     * polling and executes event->finish (evt_request_input_finish)
     */
    return 0;
}

int evt_request_finish(EventHandler *h, Event *event) { 
    EventHttpIO *io = event->cookie;
    HttpParser  *parser  = io->parser;
    HTTPRequest *request = io->request;
      
    int r = handle_request(request, NULL, h);
    if(r != 0) {
        // TODO: error message
        close(request->connection->fd);
    }
    
    /*
     * handle_request can return before the request is finished, but it copies
     * all important data. We can free request, parser and event
     * 
     * don't free request->netbuf and request->connection
     */
    http_request_cleanup(request);
    http_parser_free(parser);
    
    free(io);
    free(event);
    
    return 0;
}

int evt_request_error(EventHandler *h, Event *event) { 
    EventHttpIO *io = event->cookie;
    HttpParser  *parser  = io->parser;
    HTTPRequest *request = io->request;
    
    if(event->error) {
        log_ereport(LOG_VERBOSE, "sessionhandler http io error: %d", io->error);
    }
    
    free(request->netbuf->inbuf);
    free(request->netbuf);
    
    cfg_unref(request->connection->listener->cfg);
    connection_destroy(request->connection);
    
    http_request_cleanup(request);
    http_parser_free(parser);
    
    free(io);
    free(event);
    
    return 0;
}

void evt_keep_alive(SessionHandler *handler, Connection *conn) {
    // TODO: set timeout
    
    /* TODO:
     * Don't just re-enqueue the connection
     * create a evt_req_init function which does most of the evt_enq_conn stuff
     * but don't poll.
     * evt_keep_alive should poll and if an event occurs:
     *   evt_req_init
     *   evt_request_input
     * evt_enq_conn should do:
     *   evt_req_init
     *   ev_pollin
     */
    evt_enq_conn(handler, conn);
}

mercurial