refactor send_cgi into non-blocking SAF (wip)

Sat, 12 Nov 2022 20:50:45 +0100

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Sat, 12 Nov 2022 20:50:45 +0100
changeset 430
83560f32e7d5
parent 429
25c8e8021156
child 431
032b0ad35ee3

refactor send_cgi into non-blocking SAF (wip)

src/server/daemon/sessionhandler.c file | annotate | diff | comparison | revisions
src/server/safs/cgi.c file | annotate | diff | comparison | revisions
src/server/safs/cgi.h file | annotate | diff | comparison | revisions
src/server/util/io.c file | annotate | diff | comparison | revisions
src/server/util/io.h file | annotate | diff | comparison | revisions
--- a/src/server/daemon/sessionhandler.c	Sat Nov 12 17:34:58 2022 +0100
+++ b/src/server/daemon/sessionhandler.c	Sat Nov 12 20:50:45 2022 +0100
@@ -117,7 +117,7 @@
         io = sslstream_new(pool, conn->ssl);
         *ssl = 1;
     } else {
-        io = sycx_stream_new(pool, conn->fd);
+        io = Sysstream_new(pool, conn->fd);
         *ssl = 0;
     }
     return io;
--- a/src/server/safs/cgi.c	Sat Nov 12 17:34:58 2022 +0100
+++ b/src/server/safs/cgi.c	Sat Nov 12 20:50:45 2022 +0100
@@ -42,6 +42,7 @@
 #include "../util/pblock.h"
 #include "../daemon/netsite.h"
 #include "../util/io.h"
+#include "../daemon/event.h"
 
 #include "cgiutils.h"
 
@@ -80,13 +81,23 @@
     
     const char *args = pblock_findval("query", rq->reqpb);
     char **argv = cgi_create_argv(path, NULL, args);
+    if(!argv) {
+        return REQ_ABORTED;
+    }
     
     char **env = http_hdrs2env(rq->headers);
     env = cgi_common_vars(sn, rq, env);
     env = cgi_specific_vars(sn, rq, args, env, 1);
     
-    CGIProcess cgip;
-    int ret = cgi_start(&cgip, path, argv, env);
+    // event handler object for non-blocking io event handler
+    CGIHandler *handler = pool_malloc(sn->pool, sizeof(CGIHandler));
+    if(!handler) {
+        return REQ_ABORTED;
+    }
+    ZERO(handler, sizeof(CGIHandler));
+    handler->path = path;
+    
+    int ret = cgi_start(&handler->process, path, argv, env);
     if(ret != REQ_PROCEED) {
         util_env_free(env);
         cgi_free_argv(argv);
@@ -95,7 +106,7 @@
     
     util_env_free(env);
     cgi_free_argv(argv);
-    
+      
     char buf[4096]; // I/O buffer
     ssize_t r;
     
@@ -109,101 +120,173 @@
                         LOG_FAILURE,
                         "send-cgi: script: %s: cannot read request body",
                         path);
-                kill(cgip.pid, SIGTERM);
-                cgi_close(&cgip);
+                kill(handler->process.pid, SIGTERM);
+                cgi_close(&handler->process);
                 return REQ_ABORTED;
             }
-            ssize_t w = write(cgip.in[1], buf, r);
+            ssize_t w = write(handler->process.in[1], buf, r);
             if(w <= 0) {
                 // TODO: handle error
                 log_ereport(
                         LOG_FAILURE,
                         "send-cgi: script: %s: cannot send request body to cgi process",
                         path);
-                kill(cgip.pid, SIGKILL);
-                cgi_close(&cgip);
+                kill(handler->process.pid, SIGKILL);
+                cgi_close(&handler->process);
                 return REQ_ABORTED;
             }
             n += r;
         }
     }
-    system_close(cgip.in[1]);
-    cgip.in[1] = -1;
+    system_close(handler->process.in[1]);
+    handler->process.in[1] = -1;
+    
+    handler->parser = cgi_parser_new(sn, rq);
+    
+    Event *readev = pool_malloc(sn->pool, sizeof(Event));
+    ZERO(readev, sizeof(Event));
+    readev->cookie = handler;
+    readev->fn = cgi_stdout_readevent;
+    readev->finish = cgi_event_finish;
+    
+    Event *stderr_readev = pool_malloc(sn->pool, sizeof(Event));
+    ZERO(stderr_readev, sizeof(Event));
+    stderr_readev->cookie = handler;
+    stderr_readev->fn = cgi_stderr_readevent;
+    stderr_readev->finish = NULL;
+    
+    Event *writeev = pool_malloc(sn->pool, sizeof(Event));
+    ZERO(writeev, sizeof(Event));
+    writeev->cookie = handler;
+    // TODO: fn
+    
+    handler->writeev = writeev;
     
-    // read from child
-    CGIResponseParser *parser = cgi_parser_new(sn, rq);
-    WSBool cgiheader = TRUE;
+    // add poll events for cgi stdout/stderr
+    int error = 0;
+    if(ev_pollin(sn->ev, handler->process.err[0], stderr_readev)) {
+        log_ereport(LOG_FAILURE, "send-cgi: stderr ev_pollin failed");
+        error = 1;
+    }
+    if(ev_pollin(sn->ev, handler->process.out[0], readev)) {
+        log_ereport(LOG_FAILURE, "send-cgi: stdout ev_pollin failed");
+        error = 1;
+    }
+    
+    if(error) {
+        log_ereport(LOG_FAILURE, "cgi-send: kill script: %s", path);
+        kill(handler->process.pid, SIGKILL);
+        cgi_parser_free(handler->parser);
+        return REQ_ABORTED;
+    }
+    
+    return REQ_PROCESSING;
+}
+
+int cgi_stdout_readevent(EventHandler *ev, Event *event) {
+    CGIHandler *handler = event->cookie;
+    CGIResponseParser *parser = handler->parser;
+    Session *sn = parser->sn;
+    Request *rq = parser->rq;
+    
+    char buf[4096]; // I/O buffer
+    ssize_t r;
     ssize_t wr = 0;
-    int result = REQ_PROCEED;
-    size_t response_length = 0;
-    while((r = read(cgip.out[0], buf, 4096)) > 0) {
-        if(cgiheader) {
+    
+    handler->result = REQ_PROCEED;
+    while((r = read(handler->process.out[0], buf, 4096)) > 0) {
+        if(parser->cgiheader) {
             size_t pos;
-            ret = cgi_parse_response(parser, buf, r, &pos);
+            int ret = cgi_parse_response(parser, buf, r, &pos);
             if(ret == -1) {
                 log_ereport(
                         LOG_FAILURE,
-                        "broken cgi script response: path: %s", path);
+                        "broken cgi script response: path: %s", handler->path);
                 protocol_status(sn, rq, 500, NULL);
-                result = REQ_ABORTED;
+                handler->result = REQ_ABORTED;
                 break;
             } else if(ret == 1) {
-                cgiheader = FALSE;
+                parser->cgiheader = FALSE;
                 if(parser->status > 0) {
                     protocol_status(sn, rq, parser->status, parser->msg);
                 }
                 http_start_response(sn, rq);
                 if(pos < r) {
-                    response_length += r-pos;
+                    parser->response_length += r-pos;
                     wr = net_write(sn->csd, &buf[pos], r-pos);
                     if(wr <= 0) {
-                        result = REQ_ABORTED;
+                        handler->result = REQ_ABORTED;
                         break;
                     }
                 }
             }
         } else {
-            response_length += r;
+            parser->response_length += r;
             wr = net_write(sn->csd, buf, r);
             if(wr <= 0) {
-                result = REQ_ABORTED;
+                handler->result = REQ_ABORTED;
                 break;
             }
         }
     }
+    if(r < 0 && errno == EWOULDBLOCK) {
+        event->events = EVENT_POLLIN;
+        return 1;
+    }
     
     char *ctlen_header = pblock_findkeyval(pb_key_content_length, rq->srvhdrs);
     if(ctlen_header) {
         int64_t ctlenhdr;
         if(util_strtoint(ctlen_header, &ctlenhdr)) {
-            if(ctlenhdr != response_length) {
+            if(ctlenhdr != parser->response_length) {
                 log_ereport(
                         LOG_FAILURE,
                         "cgi-send: script: %s: content length mismatch",
-                        path);
+                        handler->path);
                 rq->rq_attr.keep_alive = 0;
-                result = REQ_ABORTED;
+                handler->result = REQ_ABORTED;
             }
         }
     }
     
-    if(result == REQ_ABORTED) {
-        log_ereport(LOG_FAILURE, "cgi-send: kill script: %s", path);
-        kill(cgip.pid, SIGKILL);
+    return 0;
+}
+
+int cgi_stderr_readevent(EventHandler *ev, Event *event) {
+    CGIHandler *handler = event->cookie;
+    
+    char  buf[4096];
+    ssize_t r = read(handler->process.err[0], buf, 4096);
+    log_ereport(LOG_INFORM, "cgi pid %d %s stderr: %.*s", (int)handler->process.pid, handler->path, (int)r, buf);
+    
+    return 0;
+}
+
+int cgi_event_finish(EventHandler *ev, Event *event) {
+    CGIHandler *handler = event->cookie;
+    CGIResponseParser *parser = handler->parser;
+    Session *sn = parser->sn;
+    Request *rq = parser->rq;
+    
+    if(handler->result == REQ_ABORTED) {
+        log_ereport(LOG_FAILURE, "cgi-send: kill script: %s", handler->path);
+        kill(handler->process.pid, SIGKILL);
     }
     
-    int exit_code = cgi_close(&cgip);
+    int exit_code = cgi_close(&handler->process);
     if(exit_code != 0) {
-        log_ereport(LOG_FAILURE, "send-cgi: script: %s exited with code %d", path, exit_code);
-        ret = REQ_ABORTED;
+        log_ereport(LOG_FAILURE, "send-cgi: script: %s exited with code %d", handler->path, exit_code);
+        handler->result = REQ_ABORTED;
     }
       
     cgi_parser_free(parser);
-    return result;
+    // return to nsapi loop
+    nsapi_function_return(sn, rq, handler->result);
+    return 0;
 }
 
 int cgi_start(CGIProcess *p, char *path, char *const argv[], char *const envp[]) {
-    if(pipe(p->in) || pipe(p->out)) {
+    if(pipe(p->in) || pipe(p->out) || pipe(p->err)) {
         log_ereport(
                 LOG_FAILURE,
                 "send-cgi: cannot create pipe: %s",
@@ -241,6 +324,10 @@
             perror("cgi_start: dup2");
             exit(EXIT_FAILURE);
         }
+        if(dup2(p->err[1], STDERR_FILENO) == -1) {
+            perror("cgi_start: dup2");
+            exit(EXIT_FAILURE);
+        }
         
         // we need to close this unused pipe
         // otherwise stdin cannot reach EOF
@@ -251,7 +338,9 @@
     } else {
         // parent  
         system_close(p->out[1]);
+        system_close(p->err[1]);
         p->out[1] = -1;
+        p->err[1] = -1;
     }
     
     return REQ_PROCEED;
@@ -273,6 +362,12 @@
     if(p->out[1] != -1) {
         system_close(p->out[1]);
     }
+    if(p->err[0] != -1) {
+        system_close(p->err[0]);
+    }
+    if(p->err[1] != -1) {
+        system_close(p->err[1]);
+    }
     
     return status;
 }
@@ -283,6 +378,8 @@
     parser->rq = rq;
     parser->status = 0;
     parser->msg = NULL;
+    parser->response_length = 0;
+    parser->cgiheader = TRUE;
     cxBufferInit(&parser->tmp, NULL, 64, pool_allocator(sn->pool), CX_BUFFER_AUTO_EXTEND|CX_BUFFER_FREE_CONTENTS);
     return parser;
 }
--- a/src/server/safs/cgi.h	Sat Nov 12 17:34:58 2022 +0100
+++ b/src/server/safs/cgi.h	Sat Nov 12 20:50:45 2022 +0100
@@ -39,6 +39,7 @@
 typedef struct {
     int in[2];
     int out[2];
+    int err[2];
     pid_t pid;
 } CGIProcess;
 
@@ -48,7 +49,18 @@
     CxBuffer  tmp;
     int       status;
     char      *msg;
+    size_t    response_length;
+    WSBool    cgiheader;
 } CGIResponseParser;
+
+typedef struct CGIHandler {
+    CGIProcess process;
+    CGIResponseParser *parser;
+    char *path;
+    Event *writeev;
+    char *stderr_tmp;
+    int result;
+} CGIHandler;
     
 int send_cgi(pblock *pb, Session *sn, Request *rq);
 
@@ -58,6 +70,10 @@
 
 int cgi_close(CGIProcess *p);
 
+int cgi_stdout_readevent(EventHandler *ev, Event *event);
+int cgi_stderr_readevent(EventHandler *ev, Event *event);
+int cgi_event_finish(EventHandler *ev, Event *event);
+
 CGIResponseParser* cgi_parser_new(Session *sn, Request *rq);
 void cgi_parser_free(CGIResponseParser *parser);
 int cgi_parse_response(CGIResponseParser *parser, char *buf, size_t len, size_t *bpos);
--- a/src/server/util/io.c	Sat Nov 12 17:34:58 2022 +0100
+++ b/src/server/util/io.c	Sat Nov 12 20:50:45 2022 +0100
@@ -107,31 +107,31 @@
 
 
 /*
- * Sycx_stream implementation
+ * Sysstream implementation
  */
 
-IOStream* sycx_stream_new(pool_handle_t *pool, SYS_SOCKET fd) {
-    Sycx_stream *st = pool_malloc(pool, sizeof(Sycx_stream));
+IOStream* Sysstream_new(pool_handle_t *pool, SYS_SOCKET fd) {
+    Sysstream *st = pool_malloc(pool, sizeof(Sysstream));
     st->st = native_io_funcs;
     st->fd = fd;
     return (IOStream*)st;
 }
 
 #ifdef XP_UNIX
-ssize_t net_sys_write(Sycx_stream *st, void *buf, size_t nbytes) {
+ssize_t net_sys_write(Sysstream *st, void *buf, size_t nbytes) {
     return write(st->fd, buf, nbytes);
 }
 
-ssize_t net_sys_writev(Sycx_stream *st, struct iovec *iovec, int iovcnt) {
+ssize_t net_sys_writev(Sysstream *st, struct iovec *iovec, int iovcnt) {
     return writev(st->fd, iovec, iovcnt);
 }
 
-ssize_t net_sys_read(Sycx_stream *st, void *buf, size_t nbytes) {
+ssize_t net_sys_read(Sysstream *st, void *buf, size_t nbytes) {
     return read(st->fd, buf, nbytes);
 }
 
 #ifdef WS_SENDFILE
-ssize_t net_sys_sendfile(Sycx_stream *st, sendfiledata *sfd) {
+ssize_t net_sys_sendfile(Sysstream *st, sendfiledata *sfd) {
     ssize_t ret = 0;
     off_t fileoffset = sfd->offset;
     if(sfd->fd->fd != -1) {
@@ -178,11 +178,11 @@
 }
 #endif
 
-void net_sys_close(Sycx_stream *st) {
+void net_sys_close(Sysstream *st) {
     system_close(st->fd);
 }
 
-void net_sys_setmode(Sycx_stream *st, int mode) {
+void net_sys_setmode(Sysstream *st, int mode) {
     int flags;
     if (-1 == (flags = fcntl(st->fd, F_GETFL, 0))) {
         flags = 0;
@@ -200,7 +200,7 @@
     }
 }
 
-int net_sys_poll(Sycx_stream *st, EventHandler *ev, int events, Event *cb) {
+int net_sys_poll(Sysstream *st, EventHandler *ev, int events, Event *cb) {
     switch(events) {
         default: return -1;
         case IO_POLL_NONE: return ev_remove_poll(ev, st->fd);
@@ -212,7 +212,7 @@
 
 #elif defined(XP_WIN32)
 
-ssize_t net_sys_write(Sycx_stream *st, void *buf, size_t nbytes) {
+ssize_t net_sys_write(Sysstream *st, void *buf, size_t nbytes) {
     int ret = send(st->fd, buf, nbytes, 0);
     if(ret == SOCKET_ERROR) {
         return IO_ERROR;
@@ -220,11 +220,11 @@
     return ret;
 }
 
-ssize_t net_sys_writev(Sycx_stream *st, struct iovec *iovec, int iovcnt) {
+ssize_t net_sys_writev(Sysstream *st, struct iovec *iovec, int iovcnt) {
     // TODO
 }
 
-ssize_t net_sys_read(Sycx_stream *st, void *buf, size_t nbytes) {
+ssize_t net_sys_read(Sysstream *st, void *buf, size_t nbytes) {
     int ret = recv(st->fd, buf, nbytes, 0);
     if(ret == SOCKET_ERROR) {
         return IO_ERROR;
@@ -232,11 +232,11 @@
     return ret;
 }
 
-ssize_t net_sys_sendfile(Sycx_stream *st, sendfiledata *sfd) {
+ssize_t net_sys_sendfile(Sysstream *st, sendfiledata *sfd) {
     // TODO
 }
 
-void net_sys_close(Sycx_stream *st) {
+void net_sys_close(Sysstream *st) {
     closesocket(st->fd);
 }
 
--- a/src/server/util/io.h	Sat Nov 12 17:34:58 2022 +0100
+++ b/src/server/util/io.h	Sat Nov 12 20:50:45 2022 +0100
@@ -56,7 +56,7 @@
 #define IO_POLL_OUT         2
     
 typedef struct IOStream     IOStream;
-typedef struct Sycx_stream    Sycx_stream;
+typedef struct Sysstream    Sysstream;
 typedef struct HttpStream   HttpStream;
 
 typedef ssize_t(*io_write_f)(IOStream *, void *, size_t);
@@ -80,7 +80,7 @@
     int           io_errno;
 };
 
-struct Sycx_stream {
+struct Sysstream {
     IOStream st;
 #ifdef XP_UNIX
     int      fd;
@@ -154,15 +154,15 @@
 
 
 /* system stream */
-IOStream* sycx_stream_new(pool_handle_t *pool, SYS_SOCKET fd);
+IOStream* Sysstream_new(pool_handle_t *pool, SYS_SOCKET fd);
 
-ssize_t net_sys_write(Sycx_stream *st, void *buf, size_t nbytes);
-ssize_t net_sys_writev(Sycx_stream *st, struct iovec *iovec, int iovcnt);
-ssize_t net_sys_read(Sycx_stream *st, void *buf, size_t nbytes);
-ssize_t net_sys_sendfile(Sycx_stream *st, sendfiledata *sfd);
-void net_sys_close(Sycx_stream *st);
-void net_sys_setmode(Sycx_stream *st, int mode);
-int net_sys_poll(Sycx_stream *st, EventHandler *ev, int events, Event *cb);
+ssize_t net_sys_write(Sysstream *st, void *buf, size_t nbytes);
+ssize_t net_sys_writev(Sysstream *st, struct iovec *iovec, int iovcnt);
+ssize_t net_sys_read(Sysstream *st, void *buf, size_t nbytes);
+ssize_t net_sys_sendfile(Sysstream *st, sendfiledata *sfd);
+void net_sys_close(Sysstream *st);
+void net_sys_setmode(Sysstream *st, int mode);
+int net_sys_poll(Sysstream *st, EventHandler *ev, int events, Event *cb);
 
 /* http stream */
 IOStream* httpstream_new(pool_handle_t *pool, IOStream *fd);

mercurial