Tue, 13 Aug 2024 19:59:42 +0200
new linux event_send implementation, replace event pipes with eventfd
--- a/src/server/daemon/event.h Mon Aug 12 21:20:17 2024 +0200 +++ b/src/server/daemon/event.h Tue Aug 13 19:59:42 2024 +0200 @@ -36,7 +36,7 @@ extern "C" { #endif -#define EV_MAX_EVENTS 32 +#define EV_MAX_EVENTS 64 #define EV_IDLE_TIMEOUT 120 #define EV_IDLE_LOOP_CTN 16
--- a/src/server/daemon/event_linux.c Mon Aug 12 21:20:17 2024 +0200 +++ b/src/server/daemon/event_linux.c Tue Aug 13 19:59:42 2024 +0200 @@ -32,6 +32,9 @@ #include <sys/epoll.h> #include <sys/eventfd.h> +#include <cx/array_list.h> +#include <cx/linked_list.h> + #include "../util/systhr.h" #include "../util/atomic.h" @@ -60,18 +63,31 @@ return NULL; } - int eventpipe[2]; - if(pipe(eventpipe)) { - log_ereport(LOG_FAILURE, "evhandler_create: pipe: %s", strerror(errno)); + handler->event_fd = eventfd(0, EFD_NONBLOCK); + if(handler->event_fd < 0) { + log_ereport(LOG_FAILURE, "evhandler_create: eventfd: %s", strerror(errno)); + return NULL; + } + + EventQueue *queue = malloc(sizeof(EventQueue)); + if(!queue) { return NULL; } - handler->eventin = eventpipe[0]; - handler->eventout = eventpipe[1]; + queue->numevents = 0; + queue->next = NULL; + handler->queue_begin = queue; + handler->queue_end = queue; + handler->num_reserve = 0; + + if(pthread_mutex_init(&handler->queue_lock, NULL)) { + log_ereport(LOG_FAILURE, "evhandler_create: cannot initialize mutex"); + return NULL; + } struct epoll_event epev; epev.events = EPOLLIN | EPOLLET; // input event, edge triggered epev.data.ptr = NULL; - if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->eventin, &epev)) { + if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->event_fd, &epev)) { log_ereport(LOG_FAILURE, "evhandler_create: epoll_ctl: %s", strerror(errno)); return NULL; } @@ -94,13 +110,50 @@ struct epoll_event events[EV_MAX_EVENTS]; Event* finished[EV_MAX_EVENTS]; + size_t queue_len = 0; + int loop_ctn = 0; for(;;) { - /* wait for events */ - int ret = epoll_wait(ep, events, 16, EV_IDLE_TIMEOUT * 1000); - if(ret == -1 && errno != EINTR) { - log_ereport(LOG_FAILURE, "epoll_wait failed: %s", strerror(errno)); - continue; + // if ev->event_queue contains events, we process them first + // otherwise we get events from epoll + int ret = 0; + if(queue_len > 0) { + pthread_mutex_lock(&ev->queue_lock); + + EventQueue *queue = ev->queue_begin; + queue_len = queue->numevents; + + // queue_len cannot be bigger than EV_MAX_EVENTS + // get events from the queue + for(int i=0;i<queue_len;i++) { + events[i].events = 0; + events[i].data.ptr = queue->events[i]; + } + + queue->numevents = 0; + if(queue->next) { + ev->queue_begin = ev->queue_begin->next; + + // more than 1 queue block available, remove first block + if(ev->num_reserve < EV_QUEUE_RESERVE) { + ev->reserve_block[ev->num_reserve++] = queue; + queue->next = NULL; + } else { + free(queue); + } + } + + ret = queue_len; + queue_len = ev->queue_begin->numevents; + + pthread_mutex_unlock(&ev->queue_lock); + } else { + // wait for events + ret = epoll_wait(ep, events, EV_MAX_EVENTS, EV_IDLE_TIMEOUT * 1000); + if(ret == -1 && errno != EINTR) { + log_ereport(LOG_FAILURE, "epoll_wait failed: %s", strerror(errno)); + continue; + } } int numfinished = 0; @@ -108,28 +161,17 @@ for(int i=0;i<ret;i++) { Event *event = events[i].data.ptr; if(!event) { - char ebuf[sizeof(Event*)]; - int ebufpos = 0; - char *b = ebuf; - while(ebufpos < sizeof(Event*)) { - ssize_t r = read(ev->eventin, b + ebufpos, sizeof(Event*)-ebufpos); - if(r < 0) { - break; - } - ebufpos += r; + // the only epoll_event without Event ptr is from eventfd + + uint64_t eventfd_r = 0; + ssize_t r = read(ev->event_fd, &eventfd_r, sizeof(uint64_t)); + if(r > 0) { + queue_len = eventfd_r; + } else { + log_ereport(LOG_FAILURE, "eventhandler: eventfd read failed: %s", strerror(errno)); } - if(ebufpos == sizeof(Event*)) { - intptr_t *p = (intptr_t*)b; - *(&event) = (Event*)*p; - /* - if(event->fn) { - if(!event->fn(ev, event) && event->finish) { - event->finish(ev, event); - } - }*/ - } else { - continue; // should not happen - } + + continue; } if(event->fn) { @@ -232,11 +274,52 @@ EventHandlerLinux *ev = (EventHandlerLinux*)h; event->object = 0; event->events = 0; - ssize_t r = write(ev->eventout, &event, sizeof(Event*)); - if(r < sizeof(Event*)) { - log_ereport(LOG_FAILURE, "failed to send event: %s", strerror(errno)); + + int err = 0; + pthread_mutex_lock(&ev->queue_lock); + + // add event to the last block + EventQueue *block = ev->queue_end; + if(block->numevents >= EV_MAX_EVENTS) { + // last block is full + // create a new block or just use a reserved block + if(ev->num_reserve > 0) { + block = ev->reserve_block[ev->num_reserve-1]; + ev->num_reserve--; + } else { + block = malloc(sizeof(EventQueue)); + if(!block) { + block = NULL; + err = 1; + } + } + + if(block) { + ev->queue_end->next = block; + ev->queue_end = block; + + block->numevents = 0; + block->next = NULL; + } } - return r > 0 ? 0 : 1; + + if(block) { + block->events[block->numevents++] = event; + } + + + pthread_mutex_unlock(&ev->queue_lock); + + if(!err) { + uint64_t data = 1; + ssize_t r = write(ev->event_fd, &data, sizeof(uint64_t)); + if(r == 0) { + log_ereport(LOG_FAILURE, "eventhandler: failed to send event: %s", strerror(errno)); + err = 1; + } + } + + return err; } // TODO: remove this fake aio
--- a/src/server/daemon/event_linux.h Mon Aug 12 21:20:17 2024 +0200 +++ b/src/server/daemon/event_linux.h Tue Aug 13 19:59:42 2024 +0200 @@ -32,10 +32,16 @@ #include "event.h" #include <inttypes.h> +#include <cx/list.h> + #ifdef __cplusplus extern "C" { #endif +#define EV_QUEUE_RESERVE 8 + +typedef struct EventQueue EventQueue; + typedef struct EventHandlerLinux { /* * base eventhandler elements (fnreturn, watchlist) @@ -46,16 +52,58 @@ * epoll fd */ int ep; + /* - * pipe read fd + * eventfd for notifying that new elements + * in the event_queue are available */ - int eventin; + int event_fd; + + /* + * queue for custom events (event_send) + * each block can contain up to EV_MAX_EVENTS events + * the event loop should not retrieve more than one block + * + * first queue block + */ + EventQueue *queue_begin; + + /* + * last queue block + */ + EventQueue *queue_end; + /* - * pipe write fd + * allocated unused blocks + */ + EventQueue *reserve_block[EV_QUEUE_RESERVE]; + + /* + * number of unused blocks + */ + size_t num_reserve; + + /* + * mutex for event_queue */ - int eventout; + pthread_mutex_t queue_lock; } EventHandlerLinux; +struct EventQueue { + /* + * array of events + */ + Event *events[EV_MAX_EVENTS]; + /* + * number of events + */ + size_t numevents; + /* + * next event block + */ + EventQueue *next; +}; + void ev_handle_events(EventHandlerLinux *ev); int ev_convert2sys_events(int events);