new linux event_send implementation, replace event pipes with eventfd

Tue, 13 Aug 2024 19:59:42 +0200

author
Olaf Wintermann <olaf.wintermann@gmail.com>
date
Tue, 13 Aug 2024 19:59:42 +0200
changeset 545
720893ec7d48
parent 544
27684460629f
child 546
5494c28db896

new linux event_send implementation, replace event pipes with eventfd

src/server/daemon/event.h file | annotate | diff | comparison | revisions
src/server/daemon/event_linux.c file | annotate | diff | comparison | revisions
src/server/daemon/event_linux.h file | annotate | diff | comparison | revisions
--- a/src/server/daemon/event.h	Mon Aug 12 21:20:17 2024 +0200
+++ b/src/server/daemon/event.h	Tue Aug 13 19:59:42 2024 +0200
@@ -36,7 +36,7 @@
 extern "C" {
 #endif
     
-#define EV_MAX_EVENTS    32
+#define EV_MAX_EVENTS    64
 #define EV_IDLE_TIMEOUT  120
 #define EV_IDLE_LOOP_CTN 16
     
--- a/src/server/daemon/event_linux.c	Mon Aug 12 21:20:17 2024 +0200
+++ b/src/server/daemon/event_linux.c	Tue Aug 13 19:59:42 2024 +0200
@@ -32,6 +32,9 @@
 #include <sys/epoll.h>
 #include <sys/eventfd.h>
 
+#include <cx/array_list.h>
+#include <cx/linked_list.h>
+
 #include "../util/systhr.h"
 #include "../util/atomic.h"
 
@@ -60,18 +63,31 @@
             return NULL;
         }
         
-        int eventpipe[2];
-        if(pipe(eventpipe)) {
-            log_ereport(LOG_FAILURE, "evhandler_create: pipe: %s", strerror(errno));
+        handler->event_fd = eventfd(0, EFD_NONBLOCK);
+        if(handler->event_fd < 0) {
+            log_ereport(LOG_FAILURE, "evhandler_create: eventfd: %s", strerror(errno));
+            return NULL;
+        }
+        
+        EventQueue *queue = malloc(sizeof(EventQueue));
+        if(!queue) {
             return NULL;
         }
-        handler->eventin = eventpipe[0];
-        handler->eventout = eventpipe[1];
+        queue->numevents = 0;
+        queue->next = NULL;
+        handler->queue_begin = queue;
+        handler->queue_end = queue;
+        handler->num_reserve = 0;
+        
+        if(pthread_mutex_init(&handler->queue_lock, NULL)) {
+            log_ereport(LOG_FAILURE, "evhandler_create: cannot initialize mutex");
+            return NULL;
+        }
         
         struct epoll_event epev;
         epev.events = EPOLLIN | EPOLLET; // input event, edge triggered
         epev.data.ptr = NULL;
-        if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->eventin, &epev)) {
+        if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->event_fd, &epev)) {
             log_ereport(LOG_FAILURE, "evhandler_create: epoll_ctl: %s", strerror(errno));
             return NULL;
         }
@@ -94,13 +110,50 @@
     struct epoll_event events[EV_MAX_EVENTS];
     Event* finished[EV_MAX_EVENTS];
     
+    size_t queue_len = 0;
+    
     int loop_ctn = 0;
     for(;;) {
-        /* wait for events */
-        int ret = epoll_wait(ep, events, 16, EV_IDLE_TIMEOUT * 1000);
-        if(ret == -1 && errno != EINTR) {
-            log_ereport(LOG_FAILURE, "epoll_wait failed: %s", strerror(errno));
-            continue;
+        // if ev->event_queue contains events, we process them first
+        // otherwise we get events from epoll
+        int ret = 0;
+        if(queue_len > 0) {
+            pthread_mutex_lock(&ev->queue_lock);
+            
+            EventQueue *queue = ev->queue_begin;
+            queue_len = queue->numevents;
+            
+            // queue_len cannot be bigger than EV_MAX_EVENTS
+            // get events from the queue
+            for(int i=0;i<queue_len;i++) {
+                events[i].events = 0;
+                events[i].data.ptr = queue->events[i];
+            }
+            
+            queue->numevents = 0;
+            if(queue->next) {
+                ev->queue_begin = ev->queue_begin->next;
+                
+                // more than 1 queue block available, remove first block
+                if(ev->num_reserve < EV_QUEUE_RESERVE) {
+                    ev->reserve_block[ev->num_reserve++] = queue;
+                    queue->next = NULL;
+                } else {
+                    free(queue);
+                }
+            }
+            
+            ret = queue_len;
+            queue_len = ev->queue_begin->numevents;
+            
+            pthread_mutex_unlock(&ev->queue_lock);
+        } else {
+            // wait for events
+            ret = epoll_wait(ep, events, EV_MAX_EVENTS, EV_IDLE_TIMEOUT * 1000);
+            if(ret == -1 && errno != EINTR) {
+                log_ereport(LOG_FAILURE, "epoll_wait failed: %s", strerror(errno));
+                continue;
+            }
         }
               
         int numfinished = 0;
@@ -108,28 +161,17 @@
         for(int i=0;i<ret;i++) {
             Event *event = events[i].data.ptr;
             if(!event) {
-                char ebuf[sizeof(Event*)];
-                int ebufpos = 0;
-                char *b = ebuf;
-                while(ebufpos < sizeof(Event*)) {
-                    ssize_t r = read(ev->eventin, b + ebufpos, sizeof(Event*)-ebufpos);
-                    if(r < 0) {
-                        break;
-                    }
-                    ebufpos += r;
+                // the only epoll_event without Event ptr is from eventfd
+
+                uint64_t eventfd_r = 0;
+                ssize_t r = read(ev->event_fd, &eventfd_r, sizeof(uint64_t));
+                if(r > 0) {
+                    queue_len = eventfd_r;
+                } else {
+                    log_ereport(LOG_FAILURE, "eventhandler: eventfd read failed: %s", strerror(errno));
                 }
-                if(ebufpos == sizeof(Event*)) {
-                    intptr_t *p = (intptr_t*)b;
-                    *(&event) = (Event*)*p;
-                    /*
-                    if(event->fn) {
-                        if(!event->fn(ev, event) && event->finish) {
-                            event->finish(ev, event);
-                        }
-                    }*/
-                } else {
-                    continue; // should not happen
-                }        
+                
+                continue;
             }
             
             if(event->fn) {
@@ -232,11 +274,52 @@
     EventHandlerLinux *ev = (EventHandlerLinux*)h;
     event->object = 0;
     event->events = 0;
-    ssize_t r = write(ev->eventout, &event, sizeof(Event*));
-    if(r < sizeof(Event*)) {
-        log_ereport(LOG_FAILURE, "failed to send event: %s", strerror(errno));
+    
+    int err = 0;
+    pthread_mutex_lock(&ev->queue_lock);
+    
+    // add event to the last block
+    EventQueue *block = ev->queue_end;
+    if(block->numevents >= EV_MAX_EVENTS) {
+        // last block is full
+        // create a new block or just use a reserved block
+        if(ev->num_reserve > 0) {
+            block = ev->reserve_block[ev->num_reserve-1];
+            ev->num_reserve--;
+        } else {
+            block = malloc(sizeof(EventQueue));
+            if(!block) {
+                block = NULL;
+                err = 1;
+            }
+        }
+        
+        if(block) {
+            ev->queue_end->next = block;
+            ev->queue_end = block;
+            
+            block->numevents = 0;
+            block->next = NULL;
+        }
     }
-    return r > 0 ? 0 : 1;
+    
+    if(block) {
+        block->events[block->numevents++] = event;
+    }
+    
+    
+    pthread_mutex_unlock(&ev->queue_lock);
+    
+    if(!err) {
+        uint64_t data = 1;
+        ssize_t r = write(ev->event_fd, &data, sizeof(uint64_t));
+        if(r == 0) {
+            log_ereport(LOG_FAILURE, "eventhandler: failed to send event: %s", strerror(errno));
+            err = 1;
+        }
+    }
+    
+    return err;
 }
 
 // TODO: remove this fake aio
--- a/src/server/daemon/event_linux.h	Mon Aug 12 21:20:17 2024 +0200
+++ b/src/server/daemon/event_linux.h	Tue Aug 13 19:59:42 2024 +0200
@@ -32,10 +32,16 @@
 #include "event.h"
 #include <inttypes.h>
 
+#include <cx/list.h>
+
 #ifdef	__cplusplus
 extern "C" {
 #endif
     
+#define EV_QUEUE_RESERVE 8
+    
+typedef struct EventQueue EventQueue;
+    
 typedef struct EventHandlerLinux {
     /*
      * base eventhandler elements (fnreturn, watchlist)
@@ -46,16 +52,58 @@
      * epoll fd
      */
     int ep;
+    
     /*
-     * pipe read fd
+     * eventfd for notifying that new elements
+     * in the event_queue are available
      */
-    int eventin;
+    int event_fd;
+    
+    /*
+     * queue for custom events (event_send)
+     * each block can contain up to EV_MAX_EVENTS events
+     * the event loop should not retrieve more than one block
+     * 
+     * first queue block
+     */
+    EventQueue *queue_begin;
+    
+    /*
+     * last queue block
+     */
+    EventQueue *queue_end;
+    
     /*
-     * pipe write fd
+     * allocated unused blocks
+     */
+    EventQueue *reserve_block[EV_QUEUE_RESERVE];
+    
+    /*
+     * number of unused blocks
+     */
+    size_t num_reserve;
+    
+    /*
+     * mutex for event_queue
      */
-    int eventout;
+    pthread_mutex_t queue_lock;
 } EventHandlerLinux;
 
+struct EventQueue {
+    /*
+     * array of events
+     */
+    Event *events[EV_MAX_EVENTS];
+    /*
+     * number of events
+     */
+    size_t numevents;
+    /*
+     * next event block
+     */
+    EventQueue *next;
+};
+
 void ev_handle_events(EventHandlerLinux *ev);
 
 int ev_convert2sys_events(int events);

mercurial