Wed, 27 Nov 2024 23:00:07 +0100
add TODO to use a future ucx feature
/* * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. * * Copyright 2013 Olaf Wintermann. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ #include <stdio.h> #include <stdlib.h> #include <errno.h> #include <sys/epoll.h> #include <sys/eventfd.h> #include <signal.h> #include <cx/array_list.h> #include <cx/linked_list.h> #include "../util/systhr.h" #include "../util/atomic.h" #include "../util/io.h" #include "event.h" #include "event_linux.h" #include "httprequest.h" EVHandler* evhandler_create(EventHandlerConfig *cfg) { EVHandler *ev = malloc(sizeof(EVHandler)); ev->current = 0; ev->instances = calloc(cfg->nthreads, sizeof(void*)); ev->numins = cfg->nthreads; for(int i=0;i<cfg->nthreads;i++) { EventHandlerLinux *handler = malloc(sizeof(EventHandlerLinux)); memset(handler, 0, sizeof(EventHandlerLinux)); ev->instances[i] = (EventHandler*)handler; handler->ep = epoll_create(64); if(handler->ep < 0) { log_ereport(LOG_FAILURE, "evhandler_create: epoll_create: %s", strerror(errno)); return NULL; } handler->event_fd = eventfd(0, EFD_NONBLOCK); if(handler->event_fd < 0) { log_ereport(LOG_FAILURE, "evhandler_create: eventfd: %s", strerror(errno)); return NULL; } EventQueue *queue = malloc(sizeof(EventQueue)); if(!queue) { return NULL; } queue->numevents = 0; queue->next = NULL; handler->queue_begin = queue; handler->queue_end = queue; handler->num_reserve = 0; if(pthread_mutex_init(&handler->queue_lock, NULL)) { log_ereport(LOG_FAILURE, "evhandler_create: cannot initialize mutex"); return NULL; } struct epoll_event epev; epev.events = EPOLLIN | EPOLLET; // input event, edge triggered epev.data.ptr = NULL; if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->event_fd, &epev)) { log_ereport(LOG_FAILURE, "evhandler_create: epoll_ctl: %s", strerror(errno)); return NULL; } handler->thr = systhread_start( 0, 0, (thrstartfunc)ev_handle_events, handler); } return ev; } void ev_instance_wait(EventHandler *h) { EventHandlerLinux *ev = (EventHandlerLinux*)h; void *ret; pthread_join(ev->thr, &ret); } static volatile int ev_close = 0; void ev_instance_close(EventHandler *h) { EventHandlerLinux *ev = (EventHandlerLinux*)h; ev_close = 1; close(ev->ep); } // unique event addr that indicates shutdown static Event shutdown_event; void ev_instance_shutdown(EventHandler *h) { event_send(h, &shutdown_event); } void ev_handle_events(EventHandlerLinux *ev) { EventHandler *h = (EventHandler*)ev; int ep = ev->ep; struct epoll_event events[EV_MAX_EVENTS]; Event* finished[EV_MAX_EVENTS]; size_t queue_len = 0; int loop_ctn = 0; int ev_shutdown = 0; while(!ev_shutdown) { // if ev->event_queue contains events, we process them first // otherwise we get events from epoll int ret = 0; if(queue_len > 0) { pthread_mutex_lock(&ev->queue_lock); EventQueue *queue = ev->queue_begin; queue_len = queue->numevents; // queue_len cannot be bigger than EV_MAX_EVENTS // get events from the queue for(int i=0;i<queue_len;i++) { events[i].events = 0; events[i].data.ptr = queue->events[i]; } queue->numevents = 0; if(queue->next) { ev->queue_begin = ev->queue_begin->next; // more than 1 queue block available, remove first block if(ev->num_reserve < EV_QUEUE_RESERVE) { ev->reserve_block[ev->num_reserve++] = queue; queue->next = NULL; } else { free(queue); } } ret = queue_len; queue_len = ev->queue_begin->numevents; pthread_mutex_unlock(&ev->queue_lock); } else { // wait for events ret = epoll_wait(ep, events, EV_MAX_EVENTS, EV_IDLE_TIMEOUT * 1000); if(ret == -1) { if(errno != EINTR) { if(!ev_close) { log_ereport(LOG_CATASTROPHE, "epoll_wait failed: %s", strerror(errno)); } break; } continue; } } int numfinished = 0; ev->base.numret = 0; for(int i=0;i<ret;i++) { Event *event = events[i].data.ptr; if(!event) { // the only epoll_event without Event ptr is from eventfd uint64_t eventfd_r = 0; ssize_t r = read(ev->event_fd, &eventfd_r, sizeof(uint64_t)); if(r > 0) { queue_len = eventfd_r; } else { log_ereport(LOG_FAILURE, "eventhandler: eventfd read failed: %s", strerror(errno)); } continue; } if(event->fn) { int saved_ev = event->events; if(!event->fn(h, event)) { // event fn returned 0 -> remove event from epoll if(saved_ev && epoll_ctl(ep, EPOLL_CTL_DEL, event->object, NULL)) { event->error = 1; log_ereport( LOG_FAILURE, "epoll_ctl failed: fd: %d error: %s", event->object, strerror(errno)); } // if set, remember this event and // execute event->finish later if(event->finish) { finished[numfinished++] = event; //event->finish(ev, event); } } else { if(saved_ev != event->events) { // event type changed struct epoll_event epev; epev.events = EPOLLET; epev.data.ptr = event; // adjust epoll events epev.events = ev_convert2sys_events(event->events); if(epoll_ctl(ep, EPOLL_CTL_MOD, event->object, &epev)) { log_ereport( LOG_FAILURE, "epoll_wait failed: %s", strerror(errno)); } } } } else if(event == &shutdown_event) { ev_instance_close(h); } } // call event finish handlers for(int i=0;i<numfinished;i++) { Event *event = finished[i]; // check again if the finish callback is set if(finished[i]->finish) { finished[i]->finish(h, event); } } // execute return calls for(int i=0;i<ev->base.numret;i++) { EVReturn ret = ev->base.fnreturn[i]; nsapi_saf_return(ret.sn, ret.rq, ret.ret); } if(ret == 0 || ++loop_ctn >= EV_IDLE_LOOP_CTN) { watchlist_check(&ev->base, 0); loop_ctn = 0; } } // epoll fd is already closed ev_queue_free(ev->queue_begin); pthread_mutex_destroy(&ev->queue_lock); close(ev->event_fd); free(ev); } void ev_queue_free(EventQueue *queue) { while(queue) { EventQueue *next = queue->next; free(queue); queue = next; } } int ev_convert2sys_events(int events) { int e = EPOLLET; if((events & EVENT_POLLIN) == EVENT_POLLIN) { e |= EPOLLIN; } if((events & EVENT_POLLOUT) == EVENT_POLLOUT) { e |= EPOLLOUT; } return e; } int ev_pollin(EventHandler *h, int fd, Event *event) { EventHandlerLinux *ev = (EventHandlerLinux*)h; event->object = (intptr_t)fd; event->events = EVENT_POLLIN; struct epoll_event epev; epev.events = EPOLLIN | EPOLLRDHUP | EPOLLET; // input event, edge triggered epev.data.ptr = event; return epoll_ctl(ev->ep, EPOLL_CTL_ADD, fd, &epev); } int ev_pollout(EventHandler *h, int fd, Event *event) { EventHandlerLinux *ev = (EventHandlerLinux*)h; event->object = (intptr_t)fd; event->events = EVENT_POLLOUT; struct epoll_event epev; epev.events = EPOLLOUT | EPOLLRDHUP | EPOLLET; // input event, edge triggered epev.data.ptr = event; return epoll_ctl(ev->ep, EPOLL_CTL_ADD, fd, &epev); } int ev_remove_poll(EventHandler *h, int fd) { EventHandlerLinux *ev = (EventHandlerLinux*)h; return epoll_ctl(ev->ep, EPOLL_CTL_DEL, fd, NULL); } int event_send(EventHandler *h, Event *event) { EventHandlerLinux *ev = (EventHandlerLinux*)h; event->object = 0; event->events = 0; int err = 0; pthread_mutex_lock(&ev->queue_lock); // add event to the last block EventQueue *block = ev->queue_end; if(block->numevents >= EV_MAX_EVENTS) { // last block is full // create a new block or just use a reserved block if(ev->num_reserve > 0) { block = ev->reserve_block[ev->num_reserve-1]; ev->num_reserve--; } else { block = malloc(sizeof(EventQueue)); if(!block) { block = NULL; err = 1; } } if(block) { ev->queue_end->next = block; ev->queue_end = block; block->numevents = 0; block->next = NULL; } } if(block) { block->events[block->numevents++] = event; } pthread_mutex_unlock(&ev->queue_lock); if(!err) { uint64_t data = 1; ssize_t r = write(ev->event_fd, &data, sizeof(uint64_t)); if(r == 0) { log_ereport(LOG_FAILURE, "eventhandler: failed to send event: %s", strerror(errno)); err = 1; } } return err; } // TODO: remove this fake aio int ev_aioread(int fd, aiocb_s *cb) { ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset); cb->result = result; if(result < 0) { cb->result_errno = errno; } return event_send(cb->evhandler, cb->event); } int ev_aiowrite(int fd, aiocb_s *cb) { ssize_t result = pwrite(fd, cb->buf, cb->nbytes, cb->offset); cb->result = result; if(result < 0) { cb->result_errno = errno; } return event_send(cb->evhandler, cb->event); } int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) { return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event); } int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) { return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event); } int event_removepoll(EventHandler *ev, SYS_NETFD fd) { return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL); }