UNIXworkcode

1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER. 3 * 4 * Copyright 2013 Olaf Wintermann. All rights reserved. 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions are met: 8 * 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 17 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE 20 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 21 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 22 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 23 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 24 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 25 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 26 * POSSIBILITY OF SUCH DAMAGE. 27 */ 28 29 #include <stdio.h> 30 #include <stdlib.h> 31 #include <errno.h> 32 #include <sys/epoll.h> 33 #include <sys/eventfd.h> 34 #include <signal.h> 35 36 #include <cx/array_list.h> 37 #include <cx/linked_list.h> 38 39 #include "../util/systhr.h" 40 #include "../util/atomic.h" 41 42 #include "../util/io.h" 43 44 #include "event.h" 45 #include "event_linux.h" 46 47 #include "httprequest.h" 48 49 50 EVHandler* evhandler_create(EventHandlerConfig *cfg) { 51 EVHandler *ev = malloc(sizeof(EVHandler)); 52 ev->current = 0; 53 ev->instances = calloc(cfg->nthreads, sizeof(void*)); 54 ev->numins = cfg->nthreads; 55 56 for(int i=0;i<cfg->nthreads;i++) { 57 EventHandlerLinux *handler = malloc(sizeof(EventHandlerLinux)); 58 memset(handler, 0, sizeof(EventHandlerLinux)); 59 ev->instances[i] = (EventHandler*)handler; 60 61 handler->ep = epoll_create(64); 62 if(handler->ep < 0) { 63 log_ereport(LOG_FAILURE, "evhandler_create: epoll_create: %s", strerror(errno)); 64 return NULL; 65 } 66 67 handler->event_fd = eventfd(0, EFD_NONBLOCK); 68 if(handler->event_fd < 0) { 69 log_ereport(LOG_FAILURE, "evhandler_create: eventfd: %s", strerror(errno)); 70 return NULL; 71 } 72 73 EventQueue *queue = malloc(sizeof(EventQueue)); 74 if(!queue) { 75 return NULL; 76 } 77 queue->numevents = 0; 78 queue->next = NULL; 79 handler->queue_begin = queue; 80 handler->queue_end = queue; 81 handler->num_reserve = 0; 82 83 if(pthread_mutex_init(&handler->queue_lock, NULL)) { 84 log_ereport(LOG_FAILURE, "evhandler_create: cannot initialize mutex"); 85 return NULL; 86 } 87 88 struct epoll_event epev; 89 epev.events = EPOLLIN | EPOLLET; // input event, edge triggered 90 epev.data.ptr = NULL; 91 if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->event_fd, &epev)) { 92 log_ereport(LOG_FAILURE, "evhandler_create: epoll_ctl: %s", strerror(errno)); 93 return NULL; 94 } 95 96 handler->thr = systhread_start( 97 0, 98 0, 99 (thrstartfunc)ev_handle_events, 100 handler); 101 } 102 103 return ev; 104 } 105 106 void ev_instance_wait(EventHandler *h) { 107 EventHandlerLinux *ev = (EventHandlerLinux*)h; 108 void *ret; 109 pthread_join(ev->thr, &ret); 110 } 111 112 static volatile int ev_close = 0; 113 114 void ev_instance_close(EventHandler *h) { 115 EventHandlerLinux *ev = (EventHandlerLinux*)h; 116 ev_close = 1; 117 close(ev->ep); 118 } 119 120 // unique event addr that indicates shutdown 121 static Event shutdown_event; 122 void ev_instance_shutdown(EventHandler *h) { 123 event_send(h, &shutdown_event); 124 } 125 126 void ev_handle_events(EventHandlerLinux *ev) { 127 EventHandler *h = (EventHandler*)ev; 128 int ep = ev->ep; 129 130 struct epoll_event events[EV_MAX_EVENTS]; 131 Event* finished[EV_MAX_EVENTS]; 132 133 size_t queue_len = 0; 134 135 int loop_ctn = 0; 136 int ev_shutdown = 0; 137 while(!ev_shutdown) { 138 // if ev->event_queue contains events, we process them first 139 // otherwise we get events from epoll 140 int ret = 0; 141 if(queue_len > 0) { 142 pthread_mutex_lock(&ev->queue_lock); 143 144 EventQueue *queue = ev->queue_begin; 145 queue_len = queue->numevents; 146 147 // queue_len cannot be bigger than EV_MAX_EVENTS 148 // get events from the queue 149 for(int i=0;i<queue_len;i++) { 150 events[i].events = 0; 151 events[i].data.ptr = queue->events[i]; 152 } 153 154 queue->numevents = 0; 155 if(queue->next) { 156 ev->queue_begin = ev->queue_begin->next; 157 158 // more than 1 queue block available, remove first block 159 if(ev->num_reserve < EV_QUEUE_RESERVE) { 160 ev->reserve_block[ev->num_reserve++] = queue; 161 queue->next = NULL; 162 } else { 163 free(queue); 164 } 165 } 166 167 ret = queue_len; 168 queue_len = ev->queue_begin->numevents; 169 170 pthread_mutex_unlock(&ev->queue_lock); 171 } else { 172 // wait for events 173 ret = epoll_wait(ep, events, EV_MAX_EVENTS, EV_IDLE_TIMEOUT * 1000); 174 if(ret == -1) { 175 if(errno != EINTR) { 176 if(!ev_close) { 177 log_ereport(LOG_CATASTROPHE, "epoll_wait failed: %s", strerror(errno)); 178 } 179 break; 180 } 181 continue; 182 } 183 } 184 185 int numfinished = 0; 186 ev->base.numret = 0; 187 for(int i=0;i<ret;i++) { 188 Event *event = events[i].data.ptr; 189 if(!event) { 190 // the only epoll_event without Event ptr is from eventfd 191 192 uint64_t eventfd_r = 0; 193 ssize_t r = read(ev->event_fd, &eventfd_r, sizeof(uint64_t)); 194 if(r > 0) { 195 queue_len = eventfd_r; 196 } else { 197 log_ereport(LOG_FAILURE, "eventhandler: eventfd read failed: %s", strerror(errno)); 198 } 199 200 continue; 201 } 202 203 if(event->fn) { 204 int saved_ev = event->events; 205 if(!event->fn(h, event)) { 206 // event fn returned 0 -> remove event from epoll 207 if(saved_ev && epoll_ctl(ep, EPOLL_CTL_DEL, event->object, NULL)) { 208 event->error = 1; 209 log_ereport( 210 LOG_FAILURE, 211 "epoll_ctl failed: fd: %d error: %s", 212 event->object, 213 strerror(errno)); 214 } 215 216 // if set, remember this event and 217 // execute event->finish later 218 if(event->finish) { 219 finished[numfinished++] = event; 220 //event->finish(ev, event); 221 } 222 } else { 223 if(saved_ev != event->events) { 224 // event type changed 225 struct epoll_event epev; 226 epev.events = EPOLLET; 227 epev.data.ptr = event; 228 229 // adjust epoll events 230 epev.events = ev_convert2sys_events(event->events); 231 232 if(epoll_ctl(ep, EPOLL_CTL_MOD, event->object, &epev)) { 233 log_ereport( 234 LOG_FAILURE, 235 "epoll_wait failed: %s", 236 strerror(errno)); 237 } 238 } 239 } 240 } else if(event == &shutdown_event) { 241 ev_instance_close(h); 242 } 243 } 244 // call event finish handlers 245 for(int i=0;i<numfinished;i++) { 246 Event *event = finished[i]; 247 // check again if the finish callback is set 248 if(finished[i]->finish) { 249 finished[i]->finish(h, event); 250 } 251 } 252 // execute return calls 253 for(int i=0;i<ev->base.numret;i++) { 254 EVReturn ret = ev->base.fnreturn[i]; 255 nsapi_saf_return(ret.sn, ret.rq, ret.ret); 256 } 257 258 if(ret == 0 || ++loop_ctn >= EV_IDLE_LOOP_CTN) { 259 watchlist_check(&ev->base, 0); 260 loop_ctn = 0; 261 } 262 } 263 264 // epoll fd is already closed 265 266 ev_queue_free(ev->queue_begin); 267 pthread_mutex_destroy(&ev->queue_lock); 268 close(ev->event_fd); 269 free(ev); 270 } 271 272 void ev_queue_free(EventQueue *queue) { 273 while(queue) { 274 EventQueue *next = queue->next; 275 free(queue); 276 queue = next; 277 } 278 } 279 280 int ev_convert2sys_events(int events) { 281 int e = EPOLLET; 282 if((events & EVENT_POLLIN) == EVENT_POLLIN) { 283 e |= EPOLLIN; 284 } 285 if((events & EVENT_POLLOUT) == EVENT_POLLOUT) { 286 e |= EPOLLOUT; 287 } 288 return e; 289 } 290 291 int ev_pollin(EventHandler *h, int fd, Event *event) { 292 EventHandlerLinux *ev = (EventHandlerLinux*)h; 293 event->object = (intptr_t)fd; 294 event->events = EVENT_POLLIN; 295 struct epoll_event epev; 296 epev.events = EPOLLIN | EPOLLRDHUP | EPOLLET; // input event, edge triggered 297 epev.data.ptr = event; 298 return epoll_ctl(ev->ep, EPOLL_CTL_ADD, fd, &epev); 299 } 300 301 int ev_pollout(EventHandler *h, int fd, Event *event) { 302 EventHandlerLinux *ev = (EventHandlerLinux*)h; 303 event->object = (intptr_t)fd; 304 event->events = EVENT_POLLOUT; 305 struct epoll_event epev; 306 epev.events = EPOLLOUT | EPOLLRDHUP | EPOLLET; // input event, edge triggered 307 epev.data.ptr = event; 308 return epoll_ctl(ev->ep, EPOLL_CTL_ADD, fd, &epev); 309 } 310 311 int ev_remove_poll(EventHandler *h, int fd) { 312 EventHandlerLinux *ev = (EventHandlerLinux*)h; 313 return epoll_ctl(ev->ep, EPOLL_CTL_DEL, fd, NULL); 314 } 315 316 int event_send(EventHandler *h, Event *event) { 317 EventHandlerLinux *ev = (EventHandlerLinux*)h; 318 event->object = 0; 319 event->events = 0; 320 321 int err = 0; 322 pthread_mutex_lock(&ev->queue_lock); 323 324 // add event to the last block 325 EventQueue *block = ev->queue_end; 326 if(block->numevents >= EV_MAX_EVENTS) { 327 // last block is full 328 // create a new block or just use a reserved block 329 if(ev->num_reserve > 0) { 330 block = ev->reserve_block[ev->num_reserve-1]; 331 ev->num_reserve--; 332 } else { 333 block = malloc(sizeof(EventQueue)); 334 if(!block) { 335 block = NULL; 336 err = 1; 337 } 338 } 339 340 if(block) { 341 ev->queue_end->next = block; 342 ev->queue_end = block; 343 344 block->numevents = 0; 345 block->next = NULL; 346 } 347 } 348 349 if(block) { 350 block->events[block->numevents++] = event; 351 } 352 353 354 pthread_mutex_unlock(&ev->queue_lock); 355 356 if(!err) { 357 uint64_t data = 1; 358 ssize_t r = write(ev->event_fd, &data, sizeof(uint64_t)); 359 if(r == 0) { 360 log_ereport(LOG_FAILURE, "eventhandler: failed to send event: %s", strerror(errno)); 361 err = 1; 362 } 363 } 364 365 return err; 366 } 367 368 // TODO: remove this fake aio 369 int ev_aioread(int fd, aiocb_s *cb) { 370 ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset); 371 cb->result = result; 372 if(result < 0) { 373 cb->result_errno = errno; 374 } 375 return event_send(cb->evhandler, cb->event); 376 } 377 378 int ev_aiowrite(int fd, aiocb_s *cb) { 379 ssize_t result = pwrite(fd, cb->buf, cb->nbytes, cb->offset); 380 cb->result = result; 381 if(result < 0) { 382 cb->result_errno = errno; 383 } 384 return event_send(cb->evhandler, cb->event); 385 } 386 387 388 int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) { 389 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event); 390 } 391 392 int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) { 393 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event); 394 } 395 396 int event_removepoll(EventHandler *ev, SYS_NETFD fd) { 397 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL); 398 } 399