src/server/daemon/event_linux.c

changeset 545
720893ec7d48
parent 543
3335f431a91b
child 547
280bf87c8689
equal deleted inserted replaced
544:27684460629f 545:720893ec7d48
30 #include <stdlib.h> 30 #include <stdlib.h>
31 #include <errno.h> 31 #include <errno.h>
32 #include <sys/epoll.h> 32 #include <sys/epoll.h>
33 #include <sys/eventfd.h> 33 #include <sys/eventfd.h>
34 34
35 #include <cx/array_list.h>
36 #include <cx/linked_list.h>
37
35 #include "../util/systhr.h" 38 #include "../util/systhr.h"
36 #include "../util/atomic.h" 39 #include "../util/atomic.h"
37 40
38 #include "../util/io.h" 41 #include "../util/io.h"
39 42
58 if(handler->ep < 0) { 61 if(handler->ep < 0) {
59 log_ereport(LOG_FAILURE, "evhandler_create: epoll_create: %s", strerror(errno)); 62 log_ereport(LOG_FAILURE, "evhandler_create: epoll_create: %s", strerror(errno));
60 return NULL; 63 return NULL;
61 } 64 }
62 65
63 int eventpipe[2]; 66 handler->event_fd = eventfd(0, EFD_NONBLOCK);
64 if(pipe(eventpipe)) { 67 if(handler->event_fd < 0) {
65 log_ereport(LOG_FAILURE, "evhandler_create: pipe: %s", strerror(errno)); 68 log_ereport(LOG_FAILURE, "evhandler_create: eventfd: %s", strerror(errno));
66 return NULL; 69 return NULL;
67 } 70 }
68 handler->eventin = eventpipe[0]; 71
69 handler->eventout = eventpipe[1]; 72 EventQueue *queue = malloc(sizeof(EventQueue));
73 if(!queue) {
74 return NULL;
75 }
76 queue->numevents = 0;
77 queue->next = NULL;
78 handler->queue_begin = queue;
79 handler->queue_end = queue;
80 handler->num_reserve = 0;
81
82 if(pthread_mutex_init(&handler->queue_lock, NULL)) {
83 log_ereport(LOG_FAILURE, "evhandler_create: cannot initialize mutex");
84 return NULL;
85 }
70 86
71 struct epoll_event epev; 87 struct epoll_event epev;
72 epev.events = EPOLLIN | EPOLLET; // input event, edge triggered 88 epev.events = EPOLLIN | EPOLLET; // input event, edge triggered
73 epev.data.ptr = NULL; 89 epev.data.ptr = NULL;
74 if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->eventin, &epev)) { 90 if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->event_fd, &epev)) {
75 log_ereport(LOG_FAILURE, "evhandler_create: epoll_ctl: %s", strerror(errno)); 91 log_ereport(LOG_FAILURE, "evhandler_create: epoll_ctl: %s", strerror(errno));
76 return NULL; 92 return NULL;
77 } 93 }
78 94
79 SYS_THREAD t = systhread_start( 95 SYS_THREAD t = systhread_start(
92 int ep = ev->ep; 108 int ep = ev->ep;
93 109
94 struct epoll_event events[EV_MAX_EVENTS]; 110 struct epoll_event events[EV_MAX_EVENTS];
95 Event* finished[EV_MAX_EVENTS]; 111 Event* finished[EV_MAX_EVENTS];
96 112
113 size_t queue_len = 0;
114
97 int loop_ctn = 0; 115 int loop_ctn = 0;
98 for(;;) { 116 for(;;) {
99 /* wait for events */ 117 // if ev->event_queue contains events, we process them first
100 int ret = epoll_wait(ep, events, 16, EV_IDLE_TIMEOUT * 1000); 118 // otherwise we get events from epoll
101 if(ret == -1 && errno != EINTR) { 119 int ret = 0;
102 log_ereport(LOG_FAILURE, "epoll_wait failed: %s", strerror(errno)); 120 if(queue_len > 0) {
103 continue; 121 pthread_mutex_lock(&ev->queue_lock);
122
123 EventQueue *queue = ev->queue_begin;
124 queue_len = queue->numevents;
125
126 // queue_len cannot be bigger than EV_MAX_EVENTS
127 // get events from the queue
128 for(int i=0;i<queue_len;i++) {
129 events[i].events = 0;
130 events[i].data.ptr = queue->events[i];
131 }
132
133 queue->numevents = 0;
134 if(queue->next) {
135 ev->queue_begin = ev->queue_begin->next;
136
137 // more than 1 queue block available, remove first block
138 if(ev->num_reserve < EV_QUEUE_RESERVE) {
139 ev->reserve_block[ev->num_reserve++] = queue;
140 queue->next = NULL;
141 } else {
142 free(queue);
143 }
144 }
145
146 ret = queue_len;
147 queue_len = ev->queue_begin->numevents;
148
149 pthread_mutex_unlock(&ev->queue_lock);
150 } else {
151 // wait for events
152 ret = epoll_wait(ep, events, EV_MAX_EVENTS, EV_IDLE_TIMEOUT * 1000);
153 if(ret == -1 && errno != EINTR) {
154 log_ereport(LOG_FAILURE, "epoll_wait failed: %s", strerror(errno));
155 continue;
156 }
104 } 157 }
105 158
106 int numfinished = 0; 159 int numfinished = 0;
107 ev->base.numret = 0; 160 ev->base.numret = 0;
108 for(int i=0;i<ret;i++) { 161 for(int i=0;i<ret;i++) {
109 Event *event = events[i].data.ptr; 162 Event *event = events[i].data.ptr;
110 if(!event) { 163 if(!event) {
111 char ebuf[sizeof(Event*)]; 164 // the only epoll_event without Event ptr is from eventfd
112 int ebufpos = 0; 165
113 char *b = ebuf; 166 uint64_t eventfd_r = 0;
114 while(ebufpos < sizeof(Event*)) { 167 ssize_t r = read(ev->event_fd, &eventfd_r, sizeof(uint64_t));
115 ssize_t r = read(ev->eventin, b + ebufpos, sizeof(Event*)-ebufpos); 168 if(r > 0) {
116 if(r < 0) { 169 queue_len = eventfd_r;
117 break; 170 } else {
118 } 171 log_ereport(LOG_FAILURE, "eventhandler: eventfd read failed: %s", strerror(errno));
119 ebufpos += r;
120 } 172 }
121 if(ebufpos == sizeof(Event*)) { 173
122 intptr_t *p = (intptr_t*)b; 174 continue;
123 *(&event) = (Event*)*p;
124 /*
125 if(event->fn) {
126 if(!event->fn(ev, event) && event->finish) {
127 event->finish(ev, event);
128 }
129 }*/
130 } else {
131 continue; // should not happen
132 }
133 } 175 }
134 176
135 if(event->fn) { 177 if(event->fn) {
136 int saved_ev = event->events; 178 int saved_ev = event->events;
137 if(!event->fn(h, event)) { 179 if(!event->fn(h, event)) {
230 272
231 int event_send(EventHandler *h, Event *event) { 273 int event_send(EventHandler *h, Event *event) {
232 EventHandlerLinux *ev = (EventHandlerLinux*)h; 274 EventHandlerLinux *ev = (EventHandlerLinux*)h;
233 event->object = 0; 275 event->object = 0;
234 event->events = 0; 276 event->events = 0;
235 ssize_t r = write(ev->eventout, &event, sizeof(Event*)); 277
236 if(r < sizeof(Event*)) { 278 int err = 0;
237 log_ereport(LOG_FAILURE, "failed to send event: %s", strerror(errno)); 279 pthread_mutex_lock(&ev->queue_lock);
238 } 280
239 return r > 0 ? 0 : 1; 281 // add event to the last block
282 EventQueue *block = ev->queue_end;
283 if(block->numevents >= EV_MAX_EVENTS) {
284 // last block is full
285 // create a new block or just use a reserved block
286 if(ev->num_reserve > 0) {
287 block = ev->reserve_block[ev->num_reserve-1];
288 ev->num_reserve--;
289 } else {
290 block = malloc(sizeof(EventQueue));
291 if(!block) {
292 block = NULL;
293 err = 1;
294 }
295 }
296
297 if(block) {
298 ev->queue_end->next = block;
299 ev->queue_end = block;
300
301 block->numevents = 0;
302 block->next = NULL;
303 }
304 }
305
306 if(block) {
307 block->events[block->numevents++] = event;
308 }
309
310
311 pthread_mutex_unlock(&ev->queue_lock);
312
313 if(!err) {
314 uint64_t data = 1;
315 ssize_t r = write(ev->event_fd, &data, sizeof(uint64_t));
316 if(r == 0) {
317 log_ereport(LOG_FAILURE, "eventhandler: failed to send event: %s", strerror(errno));
318 err = 1;
319 }
320 }
321
322 return err;
240 } 323 }
241 324
242 // TODO: remove this fake aio 325 // TODO: remove this fake aio
243 int ev_aioread(int fd, aiocb_s *cb) { 326 int ev_aioread(int fd, aiocb_s *cb) {
244 ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset); 327 ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset);

mercurial