use non-blocking IO for pipes and socket in send_cgi

Sun, 13 Nov 2022 12:58:25 +0100

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Sun, 13 Nov 2022 12:58:25 +0100
changeset 433
39fe86ae4db0
parent 432
7c9137f9e7f9
child 434
ff576305ae6e

use non-blocking IO for pipes and socket in send_cgi

src/server/daemon/event_bsd.c file | annotate | diff | comparison | revisions
src/server/daemon/protocol.c file | annotate | diff | comparison | revisions
src/server/safs/cgi.c file | annotate | diff | comparison | revisions
src/server/safs/cgi.h file | annotate | diff | comparison | revisions
--- a/src/server/daemon/event_bsd.c	Sun Nov 13 10:57:38 2022 +0100
+++ b/src/server/daemon/event_bsd.c	Sun Nov 13 12:58:25 2022 +0100
@@ -30,6 +30,7 @@
 #include <stdlib.h>
 
 #include "../util/atomic.h"
+#include "../util/io.h"
 
 #include "event_bsd.h"
 
@@ -182,3 +183,16 @@
     }
     return event_send(cb->evhandler, cb->event);
 }
+
+
+int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) {
+    return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event);
+}
+
+int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) {
+    return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event);
+}
+
+int event_removepoll(EventHandler *ev, SYS_NETFD fd) {
+    return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL);
+}
--- a/src/server/daemon/protocol.c	Sun Nov 13 10:57:38 2022 +0100
+++ b/src/server/daemon/protocol.c	Sun Nov 13 12:58:25 2022 +0100
@@ -385,6 +385,7 @@
     CxBuffer *buf = &writer->buf;
     
     // flush buffer to the socket
+    int ret = 0;
     while(buf->pos < buf->size) {
         int w = conn->write(conn, buf->space + buf->pos, buf->size - buf->pos);
         if(w <= 0) {
@@ -397,17 +398,20 @@
                     return 1;
                 }
             }
-            return -1;
+            ret = -1;
+            break;
         }
         buf->pos += w;
     }
      
-    writer->rq->senthdrs = 1;
+    if(ret == 0) {
+        writer->rq->senthdrs = 1;
+    }
     
     cxBufferDestroy(buf);
     pool_free(writer->sn->pool, writer);
     
-    return 0;
+    return ret;
 }
 
 
--- a/src/server/safs/cgi.c	Sun Nov 13 10:57:38 2022 +0100
+++ b/src/server/safs/cgi.c	Sun Nov 13 12:58:25 2022 +0100
@@ -179,6 +179,8 @@
     handler->writeev = writeev;
     handler->stderrev = stderr_readev;
     
+    net_setnonblock(sn->csd, 1);
+    
     // add poll events for cgi stdout/stderr
     int error = 0;
     if(ev_pollin(sn->ev, handler->process.err[0], stderr_readev)) {
@@ -200,15 +202,96 @@
     return REQ_PROCESSING;
 }
 
+static int cgi_try_write_flush(CGIHandler *handler, Session *sn) {
+    ssize_t wr = 0;
+    while(
+            handler->writebuf_size - handler->writebuf_pos > 0 &&
+            (wr = net_write(
+                            sn->csd,
+                            handler->writebuf + handler->writebuf_pos,
+                            handler->writebuf_size - handler->writebuf_pos))
+             > 0)
+    {
+        handler->writebuf_pos += wr;
+    }
+    if(wr < 0) {
+        if(errno != EWOULDBLOCK) {
+            handler->result = REQ_ABORTED;
+        }
+        return 1;
+    }
+    return 0;
+}
+
+static int cgi_try_write(CGIHandler *handler, EventHandler *ev, Session *sn, char *buf, size_t size) {
+    size_t pos = 0;
+    ssize_t wr = 0;
+    while(size - pos > 0 && (wr = net_write(sn->csd, buf + pos, size - pos)) > 0) {
+        pos += wr;
+    }
+    if(wr < 0) {
+        if(errno == EWOULDBLOCK) {
+            // copy remaining bytes to the write buffer
+            // we assume there are no remaining bytes in writebuf
+            size_t remaining = size-pos;
+            if(remaining <= handler->writebuf_alloc) {
+                memcpy(handler->writebuf, buf+pos, remaining);
+            } else {
+                handler->writebuf_alloc = size > 4096 ? size : 4096;
+                handler->writebuf = pool_realloc(sn->pool, handler->writebuf, handler->writebuf_alloc);
+                if(!handler->writebuf) {
+                    handler->result = REQ_ABORTED;
+                    return 1;
+                }
+            }
+            handler->writebuf_size = remaining;
+            handler->writebuf_pos = 0;
+            
+            // initialize poll, if it isn't already active
+            if(!handler->poll_out) {
+                if(event_pollout(ev, sn->csd, handler->writeev)) {
+                    handler->result = REQ_ABORTED;
+                    return 1;
+                }
+                handler->poll_out = TRUE;
+            }
+            return 1;
+        }
+        handler->result = REQ_ABORTED;
+        return 1;
+    }
+    
+    return 0;
+}
+
 int cgi_stdout_readevent(EventHandler *ev, Event *event) {
     CGIHandler *handler = event->cookie;
+    
+    return cgi_read_output(handler, ev);
+}
+
+int cgi_writeevent(EventHandler *ev, Event *event) {
+    CGIHandler *handler = event->cookie;
+    
+    // cgi_read_output will try to flush the buffer
+    return cgi_read_output(handler, ev);
+}
+
+
+
+int cgi_read_output(CGIHandler *handler, EventHandler *ev) {
     CGIResponseParser *parser = handler->parser;
     Session *sn = parser->sn;
     Request *rq = parser->rq;
     
+    // try to flush handler->writebuf
+    // if writebuf is empty, this does nothing and returns 0
+    if(cgi_try_write_flush(handler, sn)) {
+        return handler->result == REQ_ABORTED ? 0 : 1;
+    }
+    
     char buf[4096]; // I/O buffer
     ssize_t r;
-    ssize_t wr = 0;
     
     handler->result = REQ_PROCEED;
     while((r = read(handler->process.out[0], buf, 4096)) > 0) {
@@ -221,28 +304,51 @@
                         "broken cgi script response: path: %s", handler->path);
                 protocol_status(sn, rq, 500, NULL);
                 handler->result = REQ_ABORTED;
-                break;
+                return 0;
             } else if(ret == 1) {
+                WS_ASSERT(pos <= r);
+                
+                parser->response_length += r-pos;
+                
                 parser->cgiheader = FALSE;
                 if(parser->status > 0) {
                     protocol_status(sn, rq, parser->status, parser->msg);
                 }
-                http_start_response(sn, rq);
+                
+                handler->response = http_create_response(sn, rq);
+                if(!handler->response) {
+                    handler->result = REQ_ABORTED;
+                    return 0;
+                }
+                
+                int send_response = http_send_response(handler->response);
+                if(send_response < 0) {
+                    handler->result = REQ_ABORTED;
+                    break;
+                } else if(send_response == 1) {
+                    // EWOULDBLOCK
+                    if(!handler->poll_out) {
+                        if(event_pollout(ev, sn->csd, handler->writeev)) {
+                            handler->result = REQ_ABORTED;
+                            return 0;
+                        }
+                        handler->poll_out = TRUE;
+                        return 1;
+                    }
+                } else {
+                    handler->response = NULL;
+                }
+                
                 if(pos < r) {
-                    parser->response_length += r-pos;
-                    wr = net_write(sn->csd, &buf[pos], r-pos);
-                    if(wr <= 0) {
-                        handler->result = REQ_ABORTED;
-                        break;
+                    if(cgi_try_write(handler, ev, sn, &buf[pos], r-pos)) {
+                        return handler->result == REQ_ABORTED ? 0 : 1;
                     }
                 }
             }
         } else {
             parser->response_length += r;
-            wr = net_write(sn->csd, buf, r);
-            if(wr <= 0) {
-                handler->result = REQ_ABORTED;
-                break;
+            if(cgi_try_write(handler, ev, sn, buf, r)) {
+                return handler->result == REQ_ABORTED ? 0 : 1;
             }
         }
     }
@@ -250,21 +356,7 @@
         return 1;
     }
     
-    char *ctlen_header = pblock_findkeyval(pb_key_content_length, rq->srvhdrs);
-    if(ctlen_header) {
-        int64_t ctlenhdr;
-        if(util_strtoint(ctlen_header, &ctlenhdr)) {
-            if(ctlenhdr != parser->response_length) {
-                log_ereport(
-                        LOG_FAILURE,
-                        "cgi-send: script: %s: content length mismatch",
-                        handler->path);
-                rq->rq_attr.keep_alive = 0;
-                handler->result = REQ_ABORTED;
-            }
-        }
-    }
-    
+    handler->read_output_finished = TRUE;
     return 0;
 }
 
@@ -337,7 +429,7 @@
     CGIResponseParser *parser = handler->parser;
     Session *sn = parser->sn;
     Request *rq = parser->rq;
-     
+      
     if(handler->result == REQ_ABORTED) {
         log_ereport(LOG_FAILURE, "cgi-send: kill script: %s", handler->path);
         kill(handler->process.pid, SIGKILL);
@@ -349,6 +441,11 @@
         handler->stderrev->finish = cgi_event_finish;
         return 0;
     }
+    if(handler->poll_out && !handler->send_response_finished) {
+        // send response is still active
+        handler->writeev->finish = cgi_event_finish;
+        return 0;
+    }
     
     int exit_code = cgi_close(&handler->process);
     if(exit_code != 0) {
@@ -357,6 +454,27 @@
     }
       
     cgi_parser_free(parser);
+    
+    // check if content-length set by the cgi script matches the number
+    // of writes, that were written to the stream
+    // this ensures, that broken cgi scripts don't break the connection
+    char *ctlen_header = pblock_findkeyval(pb_key_content_length, rq->srvhdrs);
+    if(ctlen_header) {
+        int64_t ctlenhdr;
+        if(util_strtoint(ctlen_header, &ctlenhdr)) {
+            if(ctlenhdr != parser->response_length) {
+                log_ereport(
+                        LOG_FAILURE,
+                        "cgi-send: script: %s: content length mismatch",
+                        handler->path);
+                rq->rq_attr.keep_alive = 0;
+                handler->result = REQ_ABORTED;
+            }
+        }
+    }
+    
+    net_setnonblock(sn->csd, 0);
+    
     // return to nsapi loop
     nsapi_function_return(sn, rq, handler->result);
     return 0;
--- a/src/server/safs/cgi.h	Sun Nov 13 10:57:38 2022 +0100
+++ b/src/server/safs/cgi.h	Sun Nov 13 12:58:25 2022 +0100
@@ -56,12 +56,20 @@
 typedef struct CGIHandler {
     CGIProcess process;
     CGIResponseParser *parser;
+    HttpResponseWriter *response;
     char *path;
     Event *writeev;
     Event *stderrev;
     char *stderr_tmp;
     int stderr_tmplen;
+    char *writebuf;
+    size_t writebuf_alloc;
+    size_t writebuf_size;
+    size_t writebuf_pos;
     WSBool stderr_finished;
+    WSBool read_output_finished;
+    WSBool send_response_finished;
+    WSBool poll_out;
     int result;
 } CGIHandler;
     
@@ -76,6 +84,9 @@
 int cgi_stdout_readevent(EventHandler *ev, Event *event);
 int cgi_stderr_readevent(EventHandler *ev, Event *event);
 int cgi_event_finish(EventHandler *ev, Event *event);
+int cgi_writeevent(EventHandler *ev, Event *event);
+
+int cgi_read_output(CGIHandler *handler, EventHandler *ev);
 
 CGIResponseParser* cgi_parser_new(Session *sn, Request *rq);
 void cgi_parser_free(CGIResponseParser *parser);

mercurial