1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include <stdio.h>
30 #include <stdlib.h>
31
32 #include "../util/atomic.h"
33 #include "../util/io.h"
34
35 #include "event_bsd.h"
36
37 #include "httprequest.h"
38
39 EVHandler* evhandler_create(EventHandlerConfig *cfg) {
40 EVHandler *ev = malloc(
sizeof(EVHandler));
41 ev->current =
0;
42 ev->instances = calloc(cfg->nthreads,
sizeof(
void*));
43 ev->numins = cfg->nthreads;
44
45 for(
int i=
0;i<cfg->nthreads;i++) {
46 EventHandlerKqueue *handler = malloc(
sizeof(EventHandlerKqueue));
47 memset(handler,
0,
sizeof(EventHandlerKqueue));
48 ev->instances[i] = handler;
49
50 handler->kqueue = kqueue();
51 if(handler->kqueue <
0) {
52 log_ereport(
LOG_FAILURE,
"evhandler_create: kqueue: %s", strerror(errno));
53 return NULL;
54 }
55
56 handler->thr = systhread_start(
57 0,
58 0,
59 (thrstartfunc)ev_handle_events,
60 handler);
61 }
62
63 return ev;
64 }
65
66 void ev_instance_wait(EventHandler *h) {
67 EventHandlerLinux *ev = (EventHandlerLinux*)h;
68 void *ret;
69 pthread_join(ev->thr, &ret);
70 }
71
72 static volatile int ev_close =
0;
73
74 void ev_instance_close(EventHandler *h) {
75 EventHandlerKqueue *ev = (EventHandlerKqueue*)h;
76 close(ev->kqueue);
77 ev_close =
1;
78 }
79
80
81 static Event shutdown_event;
82 void ev_instance_shutdown(EventHandler *h) {
83 event_send(h, &shutdown_event);
84 }
85
86 void ev_handle_events(EventHandlerKqueue *ev) {
87 EventHandler *h = (EventHandler*)ev;
88 struct timespec timeout;
89 timeout.tv_nsec =
0;
90 timeout.tv_sec =
600;
91
92 struct kevent events[
EV_MAX_EVENTS];
93 struct kevent changes[
EV_MAX_EVENTS*
2];
94 Event *finished[
EV_MAX_EVENTS];
95 int numchanges =
0;
96 int numfinished;
97
98 for(;;) {
99
100 int nev = kevent(ev->kqueue, changes, numchanges, events,
EV_MAX_EVENTS, &timeout);
101 if(nev == -
1) {
102 if(errno !=
EINTR) {
103 if(!ev_close) {
104 log_ereport(
LOG_CATASTROPHE,
"kevent failed: %s", strerror(errno));
105 }
106 break;
107 }
108 continue;
109 }
110
111 numchanges =
0;
112 int numfinished =
0;
113 ev->numret =
0;
114 for(
int i=
0;i<nev;i++) {
115 Event *event = (Event*)events[i].udata;
116 if(!event) {
117 if(events[i].flags ==
0) {
118 log_ereport(
LOG_WARN,
"Unknown kevent (ident=%d)", (
int)events[i].ident);
119 }
120
121
122
123
124
125 continue;
126 }
127 int event_events = event->events;
128
129 if(event->fn) {
130 int saved_ev = event->events;
131 if(!event->fn(h, event)) {
132
133
134 if(event->finish) {
135 finished[numfinished++] = event;
136 }
137
138 event_events =
0;
139 }
else {
140 event_events = event->events;
141 }
142
143
144 if(saved_ev != event_events) {
145 int e = event_events;
146 int e_fd = events[i].ident;
147 if((e &
EVENT_POLLIN) != (saved_ev &
EVENT_POLLIN)) {
148 if((e &
EVENT_POLLIN) ==
EVENT_POLLIN) {
149
150 EV_SET(&changes[numchanges++], e_fd,
EVFILT_READ,
EV_ADD,
0,
0, event);
151 }
else {
152
153 EV_SET(&changes[numchanges++], e_fd,
EVFILT_READ,
EV_DELETE,
0,
0,
NULL);
154 }
155 }
156 if((e &
EVENT_POLLOUT) != (saved_ev &
EVENT_POLLOUT)) {
157 if((e &
EVENT_POLLOUT) ==
EVENT_POLLOUT) {
158
159 EV_SET(&changes[numchanges++], e_fd,
EVFILT_WRITE,
EV_ADD,
0,
0, event);
160 }
else {
161
162 EV_SET(&changes[numchanges++], e_fd,
EVFILT_WRITE,
EV_DELETE,
0,
0,
NULL);
163 }
164 }
165 }
166 }
else if(event == &shutdown_event) {
167 ev_instance_close(h);
168 }
169 }
170
171 for(
int i=
0;i<numfinished;i++) {
172 Event *event = finished[i];
173
174 if(finished[i]->finish) {
175 finished[i]->finish(h, event);
176 }
177 }
178
179 for(
int i=
0;i<ev->base.numret;i++) {
180 EVReturn ret = ev->base.fnreturn[i];
181 nsapi_saf_return(ret.sn, ret.rq, ret.ret);
182 }
183 }
184
185 free(ev);
186 }
187
188 int ev_pollin(EventHandler *h,
int fd, Event *event) {
189 event->events =
EVENT_POLLIN;
190 struct kevent kev;
191 EV_SET(&kev, fd,
EVFILT_READ,
EV_ADD,
0,
0, event);
192 return kevent(h->kqueue, &kev,
1,
NULL,
0,
NULL);
193 }
194
195 int ev_pollout(EventHandler *h,
int fd, Event *event) {
196 event->events =
EVENT_POLLOUT;
197 struct kevent kev;
198 EV_SET(&kev, fd,
EVFILT_WRITE,
EV_ADD,
0,
0, event);
199 return kevent(h->kqueue, &kev,
1,
NULL,
0,
NULL);
200 }
201
202 int ev_remove_poll(EventHandler *h,
int fd) {
203 struct kevent kev;
204 EV_SET(&kev, fd,
EVFILT_READ,
EV_DELETE,
0,
0,
NULL);
205 int r1 = kevent(h->kqueue, &kev,
1,
NULL,
0,
NULL);
206 EV_SET(&kev, fd,
EVFILT_WRITE,
EV_DELETE,
0,
0,
NULL);
207 int r2 = kevent(h->kqueue, &kev,
1,
NULL,
0,
NULL);
208
209 return r1 != -
1 || r2 != -
1 ?
0 :
1;
210 }
211
212 int event_send(EventHandler *h, Event *event) {
213 return 0;
214 }
215
216
217 int ev_aioread(
int fd, aiocb_s *cb) {
218 ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset);
219 cb->result = result;
220 if(result <
0) {
221 cb->result_errno = errno;
222 }
223 return event_send(cb->evhandler, cb->event);
224 }
225
226 int ev_aiowrite(
int fd, aiocb_s *cb) {
227 ssize_t result = pwrite(fd, cb->buf, cb->nbytes, cb->offset);
228 cb->result = result;
229 if(result <
0) {
230 cb->result_errno = errno;
231 }
232 return event_send(cb->evhandler, cb->event);
233 }
234
235
236 int event_pollin(EventHandler *ev,
SYS_NETFD fd, Event *event) {
237 return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_IN, event);
238 }
239
240 int event_pollout(EventHandler *ev,
SYS_NETFD fd, Event *event) {
241 return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_OUT, event);
242 }
243
244 int event_removepoll(EventHandler *ev,
SYS_NETFD fd) {
245 return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_NONE,
NULL);
246 }
247