UNIXworkcode

1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. 3 * 4 * Copyright 2013 Olaf Wintermann. All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #include <stdio.h> 30 #include <stdlib.h> 31 #include <errno.h> 32 #include <sys/epoll.h> 33 #include <sys/eventfd.h> 34 35 #include "../util/systhr.h" 36 #include "../util/atomic.h" 37 38 #include "../util/io.h" 39 40 #include "event.h" 41 #include "event_linux.h" 42 43 44 EVHandler* evhandler_create(EventHandlerConfig *cfg) { 45 EVHandler *ev = malloc(sizeof(EVHandler)); 46 ev->current = 0; 47 ev->instances = calloc(cfg->nthreads, sizeof(void*)); 48 ev->numins = cfg->nthreads; 49 50 for(int i=0;i<cfg->nthreads;i++) { 51 EventHandler *handler = malloc(sizeof(EventHandler)); 52 ev->instances[i] = handler; 53 54 handler->ep = epoll_create(64); 55 if(handler->ep == 0) { 56 // TODO: error 57 return NULL; 58 } 59 60 int eventpipe[2]; 61 if(pipe(eventpipe)) { 62 return NULL; 63 } 64 handler->eventin = eventpipe[0]; 65 handler->eventout = eventpipe[1]; 66 67 struct epoll_event epev; 68 epev.events = EPOLLIN | EPOLLET; // input event, edge triggered 69 epev.data.ptr = NULL; 70 if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->eventin, &epev)) { 71 return NULL; 72 } 73 74 SYS_THREAD t = systhread_start( 75 0, 76 0, 77 (thrstartfunc)ev_handle_events, 78 handler); 79 systhread_detach(t); 80 } 81 82 return ev; 83 } 84 85 86 void ev_handle_events(EventHandler *ev) { 87 int ep = ev->ep; 88 89 //port_event_t events[16]; 90 struct epoll_event events[16]; 91 92 for(;;) { 93 /* wait for events */ 94 int ret = epoll_wait(ep, events, 16, 100000); 95 if(ret == -1 && errno != EINTR) { 96 log_ereport(LOG_FAILURE, "epoll_wait failed: %s", strerror(errno)); 97 continue; 98 } 99 100 for(int i=0;i<ret;i++) { 101 Event *event = events[i].data.ptr; 102 if(!event) { 103 char ebuf[sizeof(Event*)]; 104 int ebufpos = 0; 105 char *b = ebuf; 106 while(ebufpos < sizeof(Event*)) { 107 ssize_t r = read(ev->eventin, b + ebufpos, sizeof(Event*)-ebufpos); 108 if(r < 0) { 109 break; 110 } 111 ebufpos += r; 112 } 113 if(ebufpos == sizeof(Event*)) { 114 intptr_t *p = (intptr_t*)b; 115 *(&event) = (Event*)*p; 116 if(event->fn) { 117 if(!event->fn(ev, event) && event->finish) { 118 event->finish(ev, event); 119 } 120 } 121 } 122 } else if(event->fn) { 123 int saved_ev = event->events; 124 if(!event->fn(ev, event)) { 125 // event fn returned 0 -> remove event from epoll 126 if(epoll_ctl(ep, EPOLL_CTL_DEL, event->object, NULL)) { 127 event->error = 1; 128 log_ereport( 129 LOG_FAILURE, 130 "epoll_ctl failed: fd: %d error: %s", 131 event->object, 132 strerror(errno)); 133 } 134 135 // if set, execute event->finish 136 if(event->finish) { 137 event->finish(ev, event); 138 } 139 } else { 140 if(saved_ev != event->events) { 141 // event type changed 142 struct epoll_event epev; 143 epev.events = EPOLLET; 144 epev.data.ptr = event; 145 146 // adjust epoll events 147 epev.events = ev_convert2sys_events(event->events); 148 149 if(epoll_ctl(ep, EPOLL_CTL_MOD, event->object, &epev)) { 150 log_ereport( 151 LOG_FAILURE, 152 "epoll_wait failed: %s", 153 strerror(errno)); 154 } 155 } 156 } 157 } 158 } 159 } 160 } 161 162 int ev_convert2sys_events(int events) { 163 int e = EPOLLET; 164 if((events & EVENT_POLLIN) == EVENT_POLLIN) { 165 e |= EPOLLIN; 166 } 167 if((events & EVENT_POLLOUT) == EVENT_POLLOUT) { 168 e |= EPOLLOUT; 169 } 170 return e; 171 } 172 173 int ev_pollin(EventHandler *h, int fd, Event *event) { 174 event->object = (intptr_t)fd; 175 event->events = EVENT_POLLIN; 176 struct epoll_event epev; 177 epev.events = EPOLLIN | EPOLLET; // input event, edge triggered 178 epev.data.ptr = event; 179 return epoll_ctl(h->ep, EPOLL_CTL_ADD, fd, &epev); 180 } 181 182 int ev_pollout(EventHandler *h, int fd, Event *event) { 183 event->object = (intptr_t)fd; 184 event->events = EVENT_POLLOUT; 185 struct epoll_event epev; 186 epev.events = EPOLLOUT | EPOLLET; // input event, edge triggered 187 epev.data.ptr = event; 188 return epoll_ctl(h->ep, EPOLL_CTL_ADD, fd, &epev); 189 } 190 191 int ev_remove_poll(EventHandler *h, int fd) { 192 return epoll_ctl(h->ep, EPOLL_CTL_DEL, fd, NULL); 193 } 194 195 int event_send(EventHandler *h, Event *event) { 196 event->object = 0; 197 event->events = 0; 198 ssize_t r = write(h->eventout, &event, sizeof(Event*)); 199 if(r < sizeof(Event*)) { 200 log_ereport(LOG_FAILURE, "failed to send event: %s", strerror(errno)); 201 } 202 return r > 0 ? 0 : 1; 203 } 204 205 // TODO: remove this fake aio 206 int ev_aioread(int fd, aiocb_s *cb) { 207 ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset); 208 cb->result = result; 209 if(result < 0) { 210 cb->result_errno = errno; 211 } 212 return event_send(cb->evhandler, cb->event); 213 } 214 215 int ev_aiowrite(int fd, aiocb_s *cb) { 216 ssize_t result = pwrite(fd, cb->buf, cb->nbytes, cb->offset); 217 cb->result = result; 218 if(result < 0) { 219 cb->result_errno = errno; 220 } 221 return event_send(cb->evhandler, cb->event); 222 } 223 224 225 int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) { 226 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event); 227 } 228 229 int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) { 230 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event); 231 } 232 233 int event_removepoll(EventHandler *ev, SYS_NETFD fd) { 234 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL); 235 } 236