1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <errno.h>
32 #include <sys/epoll.h>
33 #include <sys/eventfd.h>
34 #include <signal.h>
35
36 #include <cx/array_list.h>
37 #include <cx/linked_list.h>
38
39 #include "../util/systhr.h"
40 #include "../util/atomic.h"
41
42 #include "../util/io.h"
43
44 #include "event.h"
45 #include "event_linux.h"
46
47 #include "httprequest.h"
48
49
50 EVHandler* evhandler_create(EventHandlerConfig *cfg) {
51 EVHandler *ev = malloc(
sizeof(EVHandler));
52 ev->current =
0;
53 ev->instances = calloc(cfg->nthreads,
sizeof(
void*));
54 ev->numins = cfg->nthreads;
55
56 for(
int i=
0;i<cfg->nthreads;i++) {
57 EventHandlerLinux *handler = malloc(
sizeof(EventHandlerLinux));
58 memset(handler,
0,
sizeof(EventHandlerLinux));
59 ev->instances[i] = (EventHandler*)handler;
60
61 handler->ep = epoll_create(
64);
62 if(handler->ep <
0) {
63 log_ereport(
LOG_FAILURE,
"evhandler_create: epoll_create: %s", strerror(errno));
64 return NULL;
65 }
66
67 handler->event_fd = eventfd(
0,
EFD_NONBLOCK);
68 if(handler->event_fd <
0) {
69 log_ereport(
LOG_FAILURE,
"evhandler_create: eventfd: %s", strerror(errno));
70 return NULL;
71 }
72
73 EventQueue *queue = malloc(
sizeof(EventQueue));
74 if(!queue) {
75 return NULL;
76 }
77 queue->numevents =
0;
78 queue->next =
NULL;
79 handler->queue_begin = queue;
80 handler->queue_end = queue;
81 handler->num_reserve =
0;
82
83 if(pthread_mutex_init(&handler->queue_lock,
NULL)) {
84 log_ereport(
LOG_FAILURE,
"evhandler_create: cannot initialize mutex");
85 return NULL;
86 }
87
88 struct epoll_event epev;
89 epev.events =
EPOLLIN |
EPOLLET;
90 epev.data.ptr =
NULL;
91 if(epoll_ctl(handler->ep,
EPOLL_CTL_ADD, handler->event_fd, &epev)) {
92 log_ereport(
LOG_FAILURE,
"evhandler_create: epoll_ctl: %s", strerror(errno));
93 return NULL;
94 }
95
96 handler->thr = systhread_start(
97 0,
98 0,
99 (thrstartfunc)ev_handle_events,
100 handler);
101 }
102
103 return ev;
104 }
105
106 void ev_instance_wait(EventHandler *h) {
107 EventHandlerLinux *ev = (EventHandlerLinux*)h;
108 void *ret;
109 pthread_join(ev->thr, &ret);
110 }
111
112 static volatile int ev_close =
0;
113
114 void ev_instance_close(EventHandler *h) {
115 EventHandlerLinux *ev = (EventHandlerLinux*)h;
116 ev_close =
1;
117 close(ev->ep);
118 }
119
120
121 static Event shutdown_event;
122 void ev_instance_shutdown(EventHandler *h) {
123 event_send(h, &shutdown_event);
124 }
125
126 void ev_handle_events(EventHandlerLinux *ev) {
127 EventHandler *h = (EventHandler*)ev;
128 int ep = ev->ep;
129
130 struct epoll_event events[
EV_MAX_EVENTS];
131 Event* finished[
EV_MAX_EVENTS];
132
133 size_t queue_len =
0;
134
135 int loop_ctn =
0;
136 int ev_shutdown =
0;
137 while(!ev_shutdown) {
138
139
140 int ret =
0;
141 if(queue_len >
0) {
142 pthread_mutex_lock(&ev->queue_lock);
143
144 EventQueue *queue = ev->queue_begin;
145 queue_len = queue->numevents;
146
147
148
149 for(
int i=
0;i<queue_len;i++) {
150 events[i].events =
0;
151 events[i].data.ptr = queue->events[i];
152 }
153
154 queue->numevents =
0;
155 if(queue->next) {
156 ev->queue_begin = ev->queue_begin->next;
157
158
159 if(ev->num_reserve <
EV_QUEUE_RESERVE) {
160 ev->reserve_block[ev->num_reserve++] = queue;
161 queue->next =
NULL;
162 }
else {
163 free(queue);
164 }
165 }
166
167 ret = queue_len;
168 queue_len = ev->queue_begin->numevents;
169
170 pthread_mutex_unlock(&ev->queue_lock);
171 }
else {
172
173 ret = epoll_wait(ep, events,
EV_MAX_EVENTS,
EV_IDLE_TIMEOUT *
1000);
174 if(ret == -
1) {
175 if(errno !=
EINTR) {
176 if(!ev_close) {
177 log_ereport(
LOG_CATASTROPHE,
"epoll_wait failed: %s", strerror(errno));
178 }
179 break;
180 }
181 continue;
182 }
183 }
184
185 int numfinished =
0;
186 ev->base.numret =
0;
187 for(
int i=
0;i<ret;i++) {
188 Event *event = events[i].data.ptr;
189 if(!event) {
190
191
192 uint64_t eventfd_r =
0;
193 ssize_t r = read(ev->event_fd, &eventfd_r,
sizeof(
uint64_t));
194 if(r >
0) {
195 queue_len = eventfd_r;
196 }
else {
197 log_ereport(
LOG_FAILURE,
"eventhandler: eventfd read failed: %s", strerror(errno));
198 }
199
200 continue;
201 }
202
203 if(event->fn) {
204 int saved_ev = event->events;
205 if(!event->fn(h, event)) {
206
207 if(saved_ev && epoll_ctl(ep,
EPOLL_CTL_DEL, event->object,
NULL)) {
208 event->error =
1;
209 log_ereport(
210 LOG_FAILURE,
211 "epoll_ctl failed: fd: %d error: %s",
212 event->object,
213 strerror(errno));
214 }
215
216
217
218 if(event->finish) {
219 finished[numfinished++] = event;
220
221 }
222 }
else {
223 if(saved_ev != event->events) {
224
225 struct epoll_event epev;
226 epev.events =
EPOLLET;
227 epev.data.ptr = event;
228
229
230 epev.events = ev_convert2sys_events(event->events);
231
232 if(epoll_ctl(ep,
EPOLL_CTL_MOD, event->object, &epev)) {
233 log_ereport(
234 LOG_FAILURE,
235 "epoll_wait failed: %s",
236 strerror(errno));
237 }
238 }
239 }
240 }
else if(event == &shutdown_event) {
241 ev_instance_close(h);
242 }
243 }
244
245 for(
int i=
0;i<numfinished;i++) {
246 Event *event = finished[i];
247
248 if(finished[i]->finish) {
249 finished[i]->finish(h, event);
250 }
251 }
252
253 for(
int i=
0;i<ev->base.numret;i++) {
254 EVReturn ret = ev->base.fnreturn[i];
255 nsapi_saf_return(ret.sn, ret.rq, ret.ret);
256 }
257
258 if(ret ==
0 || ++loop_ctn >=
EV_IDLE_LOOP_CTN) {
259 watchlist_check(&ev->base,
0);
260 loop_ctn =
0;
261 }
262 }
263
264
265
266 ev_queue_free(ev->queue_begin);
267 pthread_mutex_destroy(&ev->queue_lock);
268 close(ev->event_fd);
269 free(ev);
270 }
271
272 void ev_queue_free(EventQueue *queue) {
273 while(queue) {
274 EventQueue *next = queue->next;
275 free(queue);
276 queue = next;
277 }
278 }
279
280 int ev_convert2sys_events(
int events) {
281 int e =
EPOLLET;
282 if((events &
EVENT_POLLIN) ==
EVENT_POLLIN) {
283 e |=
EPOLLIN;
284 }
285 if((events &
EVENT_POLLOUT) ==
EVENT_POLLOUT) {
286 e |=
EPOLLOUT;
287 }
288 return e;
289 }
290
291 int ev_pollin(EventHandler *h,
int fd, Event *event) {
292 EventHandlerLinux *ev = (EventHandlerLinux*)h;
293 event->object = (
intptr_t)fd;
294 event->events =
EVENT_POLLIN;
295 struct epoll_event epev;
296 epev.events =
EPOLLIN |
EPOLLRDHUP |
EPOLLET;
297 epev.data.ptr = event;
298 return epoll_ctl(ev->ep,
EPOLL_CTL_ADD, fd, &epev);
299 }
300
301 int ev_pollout(EventHandler *h,
int fd, Event *event) {
302 EventHandlerLinux *ev = (EventHandlerLinux*)h;
303 event->object = (
intptr_t)fd;
304 event->events =
EVENT_POLLOUT;
305 struct epoll_event epev;
306 epev.events =
EPOLLOUT |
EPOLLRDHUP |
EPOLLET;
307 epev.data.ptr = event;
308 return epoll_ctl(ev->ep,
EPOLL_CTL_ADD, fd, &epev);
309 }
310
311 int ev_remove_poll(EventHandler *h,
int fd) {
312 EventHandlerLinux *ev = (EventHandlerLinux*)h;
313 return epoll_ctl(ev->ep,
EPOLL_CTL_DEL, fd,
NULL);
314 }
315
316 int event_send(EventHandler *h, Event *event) {
317 EventHandlerLinux *ev = (EventHandlerLinux*)h;
318 event->object =
0;
319 event->events =
0;
320
321 int err =
0;
322 pthread_mutex_lock(&ev->queue_lock);
323
324
325 EventQueue *block = ev->queue_end;
326 if(block->numevents >=
EV_MAX_EVENTS) {
327
328
329 if(ev->num_reserve >
0) {
330 block = ev->reserve_block[ev->num_reserve-
1];
331 ev->num_reserve--;
332 }
else {
333 block = malloc(
sizeof(EventQueue));
334 if(!block) {
335 block =
NULL;
336 err =
1;
337 }
338 }
339
340 if(block) {
341 ev->queue_end->next = block;
342 ev->queue_end = block;
343
344 block->numevents =
0;
345 block->next =
NULL;
346 }
347 }
348
349 if(block) {
350 block->events[block->numevents++] = event;
351 }
352
353
354 pthread_mutex_unlock(&ev->queue_lock);
355
356 if(!err) {
357 uint64_t data =
1;
358 ssize_t r = write(ev->event_fd, &data,
sizeof(
uint64_t));
359 if(r ==
0) {
360 log_ereport(
LOG_FAILURE,
"eventhandler: failed to send event: %s", strerror(errno));
361 err =
1;
362 }
363 }
364
365 return err;
366 }
367
368
369 int ev_aioread(
int fd, aiocb_s *cb) {
370 ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset);
371 cb->result = result;
372 if(result <
0) {
373 cb->result_errno = errno;
374 }
375 return event_send(cb->evhandler, cb->event);
376 }
377
378 int ev_aiowrite(
int fd, aiocb_s *cb) {
379 ssize_t result = pwrite(fd, cb->buf, cb->nbytes, cb->offset);
380 cb->result = result;
381 if(result <
0) {
382 cb->result_errno = errno;
383 }
384 return event_send(cb->evhandler, cb->event);
385 }
386
387
388 int event_pollin(EventHandler *ev,
SYS_NETFD fd, Event *event) {
389 return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_IN, event);
390 }
391
392 int event_pollout(EventHandler *ev,
SYS_NETFD fd, Event *event) {
393 return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_OUT, event);
394 }
395
396 int event_removepoll(EventHandler *ev,
SYS_NETFD fd) {
397 return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_NONE,
NULL);
398 }
399