1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <errno.h>
32 #include <sys/epoll.h>
33 #include <sys/eventfd.h>
34
35 #include "../util/systhr.h"
36 #include "../util/atomic.h"
37
38 #include "../util/io.h"
39
40 #include "event.h"
41 #include "event_linux.h"
42
43
44 EVHandler* evhandler_create(EventHandlerConfig *cfg) {
45 EVHandler *ev = malloc(
sizeof(EVHandler));
46 ev->current =
0;
47 ev->instances = calloc(cfg->nthreads,
sizeof(
void*));
48 ev->numins = cfg->nthreads;
49
50 for(
int i=
0;i<cfg->nthreads;i++) {
51 EventHandler *handler = malloc(
sizeof(EventHandler));
52 ev->instances[i] = handler;
53
54 handler->ep = epoll_create(
64);
55 if(handler->ep ==
0) {
56
57 return NULL;
58 }
59
60 int eventpipe[
2];
61 if(pipe(eventpipe)) {
62 return NULL;
63 }
64 handler->eventin = eventpipe[
0];
65 handler->eventout = eventpipe[
1];
66
67 struct epoll_event epev;
68 epev.events =
EPOLLIN |
EPOLLET;
69 epev.data.ptr =
NULL;
70 if(epoll_ctl(handler->ep,
EPOLL_CTL_ADD, handler->eventin, &epev)) {
71 return NULL;
72 }
73
74 SYS_THREAD t = systhread_start(
75 0,
76 0,
77 (thrstartfunc)ev_handle_events,
78 handler);
79 systhread_detach(t);
80 }
81
82 return ev;
83 }
84
85
86 void ev_handle_events(EventHandler *ev) {
87 int ep = ev->ep;
88
89
90 struct epoll_event events[
16];
91
92 for(;;) {
93
94 int ret = epoll_wait(ep, events,
16,
100000);
95 if(ret == -
1 && errno !=
EINTR) {
96 log_ereport(
LOG_FAILURE,
"epoll_wait failed: %s", strerror(errno));
97 continue;
98 }
99
100 for(
int i=
0;i<ret;i++) {
101 Event *event = events[i].data.ptr;
102 if(!event) {
103 char ebuf[
sizeof(Event*)];
104 int ebufpos =
0;
105 char *b = ebuf;
106 while(ebufpos <
sizeof(Event*)) {
107 ssize_t r = read(ev->eventin, b + ebufpos,
sizeof(Event*)-ebufpos);
108 if(r <
0) {
109 break;
110 }
111 ebufpos += r;
112 }
113 if(ebufpos ==
sizeof(Event*)) {
114 intptr_t *p = (
intptr_t*)b;
115 *(&event) = (Event*)*p;
116 if(event->fn) {
117 if(!event->fn(ev, event) && event->finish) {
118 event->finish(ev, event);
119 }
120 }
121 }
122 }
else if(event->fn) {
123 int saved_ev = event->events;
124 if(!event->fn(ev, event)) {
125
126 if(epoll_ctl(ep,
EPOLL_CTL_DEL, event->object,
NULL)) {
127 event->error =
1;
128 log_ereport(
129 LOG_FAILURE,
130 "epoll_ctl failed: fd: %d error: %s",
131 event->object,
132 strerror(errno));
133 }
134
135
136 if(event->finish) {
137 event->finish(ev, event);
138 }
139 }
else {
140 if(saved_ev != event->events) {
141
142 struct epoll_event epev;
143 epev.events =
EPOLLET;
144 epev.data.ptr = event;
145
146
147 epev.events = ev_convert2sys_events(event->events);
148
149 if(epoll_ctl(ep,
EPOLL_CTL_MOD, event->object, &epev)) {
150 log_ereport(
151 LOG_FAILURE,
152 "epoll_wait failed: %s",
153 strerror(errno));
154 }
155 }
156 }
157 }
158 }
159 }
160 }
161
162 int ev_convert2sys_events(
int events) {
163 int e =
EPOLLET;
164 if((events &
EVENT_POLLIN) ==
EVENT_POLLIN) {
165 e |=
EPOLLIN;
166 }
167 if((events &
EVENT_POLLOUT) ==
EVENT_POLLOUT) {
168 e |=
EPOLLOUT;
169 }
170 return e;
171 }
172
173 int ev_pollin(EventHandler *h,
int fd, Event *event) {
174 event->object = (
intptr_t)fd;
175 event->events =
EVENT_POLLIN;
176 struct epoll_event epev;
177 epev.events =
EPOLLIN |
EPOLLET;
178 epev.data.ptr = event;
179 return epoll_ctl(h->ep,
EPOLL_CTL_ADD, fd, &epev);
180 }
181
182 int ev_pollout(EventHandler *h,
int fd, Event *event) {
183 event->object = (
intptr_t)fd;
184 event->events =
EVENT_POLLOUT;
185 struct epoll_event epev;
186 epev.events =
EPOLLOUT |
EPOLLET;
187 epev.data.ptr = event;
188 return epoll_ctl(h->ep,
EPOLL_CTL_ADD, fd, &epev);
189 }
190
191 int ev_remove_poll(EventHandler *h,
int fd) {
192 return epoll_ctl(h->ep,
EPOLL_CTL_DEL, fd,
NULL);
193 }
194
195 int event_send(EventHandler *h, Event *event) {
196 event->object =
0;
197 event->events =
0;
198 ssize_t r = write(h->eventout, &event,
sizeof(Event*));
199 if(r <
sizeof(Event*)) {
200 log_ereport(
LOG_FAILURE,
"failed to send event: %s", strerror(errno));
201 }
202 return r >
0 ?
0 :
1;
203 }
204
205
206 int ev_aioread(
int fd, aiocb_s *cb) {
207 ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset);
208 cb->result = result;
209 if(result <
0) {
210 cb->result_errno = errno;
211 }
212 return event_send(cb->evhandler, cb->event);
213 }
214
215 int ev_aiowrite(
int fd, aiocb_s *cb) {
216 ssize_t result = pwrite(fd, cb->buf, cb->nbytes, cb->offset);
217 cb->result = result;
218 if(result <
0) {
219 cb->result_errno = errno;
220 }
221 return event_send(cb->evhandler, cb->event);
222 }
223
224
225 int event_pollin(EventHandler *ev,
SYS_NETFD fd, Event *event) {
226 return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_IN, event);
227 }
228
229 int event_pollout(EventHandler *ev,
SYS_NETFD fd, Event *event) {
230 return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_OUT, event);
231 }
232
233 int event_removepoll(EventHandler *ev,
SYS_NETFD fd) {
234 return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_NONE,
NULL);
235 }
236