58 if(handler->ep < 0) { |
61 if(handler->ep < 0) { |
59 log_ereport(LOG_FAILURE, "evhandler_create: epoll_create: %s", strerror(errno)); |
62 log_ereport(LOG_FAILURE, "evhandler_create: epoll_create: %s", strerror(errno)); |
60 return NULL; |
63 return NULL; |
61 } |
64 } |
62 |
65 |
63 int eventpipe[2]; |
66 handler->event_fd = eventfd(0, EFD_NONBLOCK); |
64 if(pipe(eventpipe)) { |
67 if(handler->event_fd < 0) { |
65 log_ereport(LOG_FAILURE, "evhandler_create: pipe: %s", strerror(errno)); |
68 log_ereport(LOG_FAILURE, "evhandler_create: eventfd: %s", strerror(errno)); |
66 return NULL; |
69 return NULL; |
67 } |
70 } |
68 handler->eventin = eventpipe[0]; |
71 |
69 handler->eventout = eventpipe[1]; |
72 EventQueue *queue = malloc(sizeof(EventQueue)); |
|
73 if(!queue) { |
|
74 return NULL; |
|
75 } |
|
76 queue->numevents = 0; |
|
77 queue->next = NULL; |
|
78 handler->queue_begin = queue; |
|
79 handler->queue_end = queue; |
|
80 handler->num_reserve = 0; |
|
81 |
|
82 if(pthread_mutex_init(&handler->queue_lock, NULL)) { |
|
83 log_ereport(LOG_FAILURE, "evhandler_create: cannot initialize mutex"); |
|
84 return NULL; |
|
85 } |
70 |
86 |
71 struct epoll_event epev; |
87 struct epoll_event epev; |
72 epev.events = EPOLLIN | EPOLLET; // input event, edge triggered |
88 epev.events = EPOLLIN | EPOLLET; // input event, edge triggered |
73 epev.data.ptr = NULL; |
89 epev.data.ptr = NULL; |
74 if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->eventin, &epev)) { |
90 if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->event_fd, &epev)) { |
75 log_ereport(LOG_FAILURE, "evhandler_create: epoll_ctl: %s", strerror(errno)); |
91 log_ereport(LOG_FAILURE, "evhandler_create: epoll_ctl: %s", strerror(errno)); |
76 return NULL; |
92 return NULL; |
77 } |
93 } |
78 |
94 |
79 SYS_THREAD t = systhread_start( |
95 SYS_THREAD t = systhread_start( |
92 int ep = ev->ep; |
108 int ep = ev->ep; |
93 |
109 |
94 struct epoll_event events[EV_MAX_EVENTS]; |
110 struct epoll_event events[EV_MAX_EVENTS]; |
95 Event* finished[EV_MAX_EVENTS]; |
111 Event* finished[EV_MAX_EVENTS]; |
96 |
112 |
|
113 size_t queue_len = 0; |
|
114 |
97 int loop_ctn = 0; |
115 int loop_ctn = 0; |
98 for(;;) { |
116 for(;;) { |
99 /* wait for events */ |
117 // if ev->event_queue contains events, we process them first |
100 int ret = epoll_wait(ep, events, 16, EV_IDLE_TIMEOUT * 1000); |
118 // otherwise we get events from epoll |
101 if(ret == -1 && errno != EINTR) { |
119 int ret = 0; |
102 log_ereport(LOG_FAILURE, "epoll_wait failed: %s", strerror(errno)); |
120 if(queue_len > 0) { |
103 continue; |
121 pthread_mutex_lock(&ev->queue_lock); |
|
122 |
|
123 EventQueue *queue = ev->queue_begin; |
|
124 queue_len = queue->numevents; |
|
125 |
|
126 // queue_len cannot be bigger than EV_MAX_EVENTS |
|
127 // get events from the queue |
|
128 for(int i=0;i<queue_len;i++) { |
|
129 events[i].events = 0; |
|
130 events[i].data.ptr = queue->events[i]; |
|
131 } |
|
132 |
|
133 queue->numevents = 0; |
|
134 if(queue->next) { |
|
135 ev->queue_begin = ev->queue_begin->next; |
|
136 |
|
137 // more than 1 queue block available, remove first block |
|
138 if(ev->num_reserve < EV_QUEUE_RESERVE) { |
|
139 ev->reserve_block[ev->num_reserve++] = queue; |
|
140 queue->next = NULL; |
|
141 } else { |
|
142 free(queue); |
|
143 } |
|
144 } |
|
145 |
|
146 ret = queue_len; |
|
147 queue_len = ev->queue_begin->numevents; |
|
148 |
|
149 pthread_mutex_unlock(&ev->queue_lock); |
|
150 } else { |
|
151 // wait for events |
|
152 ret = epoll_wait(ep, events, EV_MAX_EVENTS, EV_IDLE_TIMEOUT * 1000); |
|
153 if(ret == -1 && errno != EINTR) { |
|
154 log_ereport(LOG_FAILURE, "epoll_wait failed: %s", strerror(errno)); |
|
155 continue; |
|
156 } |
104 } |
157 } |
105 |
158 |
106 int numfinished = 0; |
159 int numfinished = 0; |
107 ev->base.numret = 0; |
160 ev->base.numret = 0; |
108 for(int i=0;i<ret;i++) { |
161 for(int i=0;i<ret;i++) { |
109 Event *event = events[i].data.ptr; |
162 Event *event = events[i].data.ptr; |
110 if(!event) { |
163 if(!event) { |
111 char ebuf[sizeof(Event*)]; |
164 // the only epoll_event without Event ptr is from eventfd |
112 int ebufpos = 0; |
165 |
113 char *b = ebuf; |
166 uint64_t eventfd_r = 0; |
114 while(ebufpos < sizeof(Event*)) { |
167 ssize_t r = read(ev->event_fd, &eventfd_r, sizeof(uint64_t)); |
115 ssize_t r = read(ev->eventin, b + ebufpos, sizeof(Event*)-ebufpos); |
168 if(r > 0) { |
116 if(r < 0) { |
169 queue_len = eventfd_r; |
117 break; |
170 } else { |
118 } |
171 log_ereport(LOG_FAILURE, "eventhandler: eventfd read failed: %s", strerror(errno)); |
119 ebufpos += r; |
|
120 } |
172 } |
121 if(ebufpos == sizeof(Event*)) { |
173 |
122 intptr_t *p = (intptr_t*)b; |
174 continue; |
123 *(&event) = (Event*)*p; |
|
124 /* |
|
125 if(event->fn) { |
|
126 if(!event->fn(ev, event) && event->finish) { |
|
127 event->finish(ev, event); |
|
128 } |
|
129 }*/ |
|
130 } else { |
|
131 continue; // should not happen |
|
132 } |
|
133 } |
175 } |
134 |
176 |
135 if(event->fn) { |
177 if(event->fn) { |
136 int saved_ev = event->events; |
178 int saved_ev = event->events; |
137 if(!event->fn(h, event)) { |
179 if(!event->fn(h, event)) { |
230 |
272 |
231 int event_send(EventHandler *h, Event *event) { |
273 int event_send(EventHandler *h, Event *event) { |
232 EventHandlerLinux *ev = (EventHandlerLinux*)h; |
274 EventHandlerLinux *ev = (EventHandlerLinux*)h; |
233 event->object = 0; |
275 event->object = 0; |
234 event->events = 0; |
276 event->events = 0; |
235 ssize_t r = write(ev->eventout, &event, sizeof(Event*)); |
277 |
236 if(r < sizeof(Event*)) { |
278 int err = 0; |
237 log_ereport(LOG_FAILURE, "failed to send event: %s", strerror(errno)); |
279 pthread_mutex_lock(&ev->queue_lock); |
238 } |
280 |
239 return r > 0 ? 0 : 1; |
281 // add event to the last block |
|
282 EventQueue *block = ev->queue_end; |
|
283 if(block->numevents >= EV_MAX_EVENTS) { |
|
284 // last block is full |
|
285 // create a new block or just use a reserved block |
|
286 if(ev->num_reserve > 0) { |
|
287 block = ev->reserve_block[ev->num_reserve-1]; |
|
288 ev->num_reserve--; |
|
289 } else { |
|
290 block = malloc(sizeof(EventQueue)); |
|
291 if(!block) { |
|
292 block = NULL; |
|
293 err = 1; |
|
294 } |
|
295 } |
|
296 |
|
297 if(block) { |
|
298 ev->queue_end->next = block; |
|
299 ev->queue_end = block; |
|
300 |
|
301 block->numevents = 0; |
|
302 block->next = NULL; |
|
303 } |
|
304 } |
|
305 |
|
306 if(block) { |
|
307 block->events[block->numevents++] = event; |
|
308 } |
|
309 |
|
310 |
|
311 pthread_mutex_unlock(&ev->queue_lock); |
|
312 |
|
313 if(!err) { |
|
314 uint64_t data = 1; |
|
315 ssize_t r = write(ev->event_fd, &data, sizeof(uint64_t)); |
|
316 if(r == 0) { |
|
317 log_ereport(LOG_FAILURE, "eventhandler: failed to send event: %s", strerror(errno)); |
|
318 err = 1; |
|
319 } |
|
320 } |
|
321 |
|
322 return err; |
240 } |
323 } |
241 |
324 |
242 // TODO: remove this fake aio |
325 // TODO: remove this fake aio |
243 int ev_aioread(int fd, aiocb_s *cb) { |
326 int ev_aioread(int fd, aiocb_s *cb) { |
244 ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset); |
327 ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset); |