28 |
28 |
29 #include <stdio.h> |
29 #include <stdio.h> |
30 #include <stdlib.h> |
30 #include <stdlib.h> |
31 #include <errno.h> |
31 #include <errno.h> |
32 #include <sys/epoll.h> |
32 #include <sys/epoll.h> |
|
33 #include <sys/eventfd.h> |
33 |
34 |
34 #include "../util/systhr.h" |
35 #include "../util/systhr.h" |
35 #include "../util/atomic.h" |
36 #include "../util/atomic.h" |
36 |
37 |
|
38 #include "../util/io.h" |
37 |
39 |
38 #include "event.h" |
40 #include "event.h" |
39 #include "event_linux.h" |
41 #include "event_linux.h" |
40 |
42 |
41 |
43 |
42 event_handler_t* evhandler_create(int numthreads) { |
44 EVHandler* evhandler_create(EventHandlerConfig *cfg) { |
43 event_handler_t *ev = malloc(sizeof(event_handler_t)); |
45 EVHandler *ev = malloc(sizeof(EVHandler)); |
44 if(ev == NULL) { |
46 ev->current = 0; |
45 return NULL; |
47 ev->instances = calloc(cfg->nthreads, sizeof(void*)); |
46 } |
48 ev->numins = cfg->nthreads; |
47 |
49 |
48 ev->ep = calloc(numthreads, sizeof(int)); |
50 for(int i=0;i<cfg->nthreads;i++) { |
49 if(ev->ep == NULL) { |
51 EventHandler *handler = malloc(sizeof(EventHandler)); |
50 free(ev); |
52 ev->instances[i] = handler; |
51 return NULL; |
53 |
52 } |
54 handler->ep = epoll_create(64); |
53 ev->nep = numthreads; |
55 if(handler->ep == 0) { |
54 ev->lep = 0; |
56 // TODO: error |
55 |
|
56 /* create ports event threads */ |
|
57 for(int i=0;i<numthreads;i++) { |
|
58 /* create port */ |
|
59 ev->ep[i] = epoll_create(64); |
|
60 if(ev->ep[i] == 0) { |
|
61 free(ev->ep); |
|
62 free(ev); |
|
63 return NULL; |
57 return NULL; |
64 } |
58 } |
65 |
59 |
66 /* |
60 int eventpipe[2]; |
67 * start a new handler thread |
61 if(pipe(eventpipe)) { |
68 * the thread needs the event port and a pointer to the event handler |
|
69 */ |
|
70 ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t)); |
|
71 if(conf == NULL) { |
|
72 free(ev->ep); |
|
73 free(ev); |
|
74 return NULL; |
62 return NULL; |
75 } |
63 } |
76 conf->handler = ev; |
64 handler->eventin = eventpipe[0]; |
77 conf->ep = ev->ep[i]; |
65 handler->eventout = eventpipe[1]; |
78 |
66 |
79 systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf); |
67 struct epoll_event epev; |
80 /* TODO: error handling */ |
68 epev.events = EPOLLIN | EPOLLET; // input event, edge triggered |
|
69 epev.data.ptr = NULL; |
|
70 if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->eventin, &epev)) { |
|
71 return NULL; |
|
72 } |
|
73 |
|
74 SYS_THREAD t = systhread_start( |
|
75 0, |
|
76 0, |
|
77 (thrstartfunc)ev_handle_events, |
|
78 handler); |
|
79 systhread_detach(t); |
81 } |
80 } |
82 |
81 |
83 return ev; |
82 return ev; |
84 } |
83 } |
85 |
84 |
86 |
85 |
87 void ev_handle_events(ev_thr_conf_t *conf) { |
86 void ev_handle_events(EventHandler *ev) { |
88 event_handler_t *ev = conf->handler; |
87 int ep = ev->ep; |
89 int ep = conf->ep; |
|
90 |
|
91 free(conf); |
|
92 |
88 |
93 //port_event_t events[16]; |
89 //port_event_t events[16]; |
94 struct epoll_event events[16]; |
90 struct epoll_event events[16]; |
95 |
91 |
96 for(;;) { |
92 for(;;) { |
100 log_ereport(LOG_FAILURE, "epoll_wait failed: %s", strerror(errno)); |
96 log_ereport(LOG_FAILURE, "epoll_wait failed: %s", strerror(errno)); |
101 continue; |
97 continue; |
102 } |
98 } |
103 |
99 |
104 for(int i=0;i<ret;i++) { |
100 for(int i=0;i<ret;i++) { |
105 event_t *event = events[i].data.ptr; |
101 Event *event = events[i].data.ptr; |
106 if(event->fn) { |
102 if(!event) { |
107 int saved_ev = event->poll; |
103 char ebuf[sizeof(Event*)]; |
|
104 int ebufpos = 0; |
|
105 char *b = ebuf; |
|
106 while(ebufpos < sizeof(Event*)) { |
|
107 ssize_t r = read(ev->eventin, b + ebufpos, sizeof(Event*)-ebufpos); |
|
108 if(r < 0) { |
|
109 break; |
|
110 } |
|
111 ebufpos += r; |
|
112 } |
|
113 if(ebufpos == sizeof(Event*)) { |
|
114 intptr_t *p = (intptr_t*)b; |
|
115 *(&event) = (Event*)*p; |
|
116 if(event->fn) { |
|
117 if(!event->fn(ev, event) && event->finish) { |
|
118 event->finish(ev, event); |
|
119 } |
|
120 } |
|
121 } |
|
122 } else if(event->fn) { |
|
123 int saved_ev = event->events; |
108 if(!event->fn(ev, event)) { |
124 if(!event->fn(ev, event)) { |
109 // event fn returned 0 -> remove event from epoll |
125 // event fn returned 0 -> remove event from epoll |
110 if(epoll_ctl(ep, EPOLL_CTL_DEL, event->object, NULL)) { |
126 if(epoll_ctl(ep, EPOLL_CTL_DEL, event->object, NULL)) { |
111 event->error = 1; |
127 event->error = 1; |
112 log_ereport( |
128 log_ereport( |
119 // if set, execute event->finish |
135 // if set, execute event->finish |
120 if(event->finish) { |
136 if(event->finish) { |
121 event->finish(ev, event); |
137 event->finish(ev, event); |
122 } |
138 } |
123 } else { |
139 } else { |
124 if(saved_ev != event->poll) { |
140 if(saved_ev != event->events) { |
125 // event type changed |
141 // event type changed |
126 struct epoll_event epev; |
142 struct epoll_event epev; |
127 epev.events = EPOLLET; |
143 epev.events = EPOLLET; |
128 epev.data.ptr = event; |
144 epev.data.ptr = event; |
129 |
145 |
130 // adjust epoll events |
146 // adjust epoll events |
131 if((event->poll & EVENT_POLLIN) == EVENT_POLLIN) { |
147 epev.events = ev_convert2sys_events(event->events); |
132 epev.events |= EPOLLIN; |
|
133 } |
|
134 if((event->poll & EVENT_POLLOUT) == EVENT_POLLOUT) { |
|
135 epev.events |= EPOLLOUT; |
|
136 } |
|
137 |
148 |
138 if(epoll_ctl(ep, EPOLL_CTL_MOD, event->object, NULL)) { |
149 if(epoll_ctl(ep, EPOLL_CTL_MOD, event->object, &epev)) { |
139 log_ereport( |
150 log_ereport( |
140 LOG_FAILURE, |
151 LOG_FAILURE, |
141 "epoll_wait failed: %s", |
152 "epoll_wait failed: %s", |
142 strerror(errno)); |
153 strerror(errno)); |
143 } |
154 } |
146 } |
157 } |
147 } |
158 } |
148 } |
159 } |
149 } |
160 } |
150 |
161 |
151 /* returns a event handler port */ |
162 int ev_convert2sys_events(int events) { |
152 int ev_get_port(event_handler_t *h) { |
163 int e = EPOLLET; |
153 int nps = h->nep; |
164 if((events & EVENT_POLLIN) == EVENT_POLLIN) { |
154 if(nps == 1) { |
165 e |= EPOLLIN; |
155 return h->ep[0]; |
166 } |
156 } |
167 if((events & EVENT_POLLOUT) == EVENT_POLLOUT) { |
157 |
168 e |= EPOLLOUT; |
158 int cp = h->lep % nps; |
169 } |
159 ws_atomic_inc32(&h->lep); |
170 return e; |
160 |
171 } |
161 return h->ep[cp]; |
172 |
162 } |
173 int ev_pollin(EventHandler *h, int fd, Event *event) { |
163 |
|
164 int ev_pollin(event_handler_t *h, int fd, event_t *event) { |
|
165 event->object = (intptr_t)fd; |
174 event->object = (intptr_t)fd; |
166 event->poll = EVENT_POLLIN; |
175 event->events = EVENT_POLLIN; |
167 struct epoll_event epev; |
176 struct epoll_event epev; |
168 epev.events = EPOLLIN | EPOLLET; // input event, edge triggered |
177 epev.events = EPOLLIN | EPOLLET; // input event, edge triggered |
169 epev.data.ptr = event; |
178 epev.data.ptr = event; |
170 return epoll_ctl(ev_get_port(h), EPOLL_CTL_ADD, fd, &epev); |
179 return epoll_ctl(h->ep, EPOLL_CTL_ADD, fd, &epev); |
171 } |
180 } |
172 |
181 |
173 int ev_pollout(event_handler_t *h, int fd, event_t *event) { |
182 int ev_pollout(EventHandler *h, int fd, Event *event) { |
174 event->object = (intptr_t)fd; |
183 event->object = (intptr_t)fd; |
175 event->poll = EVENT_POLLOUT; |
184 event->events = EVENT_POLLOUT; |
176 struct epoll_event epev; |
185 struct epoll_event epev; |
177 epev.events = EPOLLOUT | EPOLLET; // input event, edge triggered |
186 epev.events = EPOLLOUT | EPOLLET; // input event, edge triggered |
178 epev.data.ptr = event; |
187 epev.data.ptr = event; |
179 return epoll_ctl(ev_get_port(h), EPOLL_CTL_ADD, fd, &epev); |
188 return epoll_ctl(h->ep, EPOLL_CTL_ADD, fd, &epev); |
180 } |
189 } |
181 |
190 |
182 int evt_send(event_handler_t *h, event_t *event) { |
191 int ev_remove_poll(EventHandler *h, int fd) { |
|
192 return epoll_ctl(h->ep, EPOLL_CTL_DEL, fd, NULL); |
|
193 } |
|
194 |
|
195 int event_send(EventHandler *h, Event *event) { |
183 event->object = 0; |
196 event->object = 0; |
184 // TODO: implement using threadpool or eventfd |
197 event->events = 0; |
185 fprintf(stderr, "Warning: evt_send not implemented\n"); |
198 ssize_t r = write(h->eventout, &event, sizeof(Event*)); |
186 return 0; |
199 if(r < sizeof(Event*)) { |
187 } |
200 log_ereport(LOG_FAILURE, "failed to send event: %s", strerror(errno)); |
|
201 } |
|
202 return r > 0 ? 0 : 1; |
|
203 } |
|
204 |
|
205 // TODO: remove this fake aio |
|
206 int ev_aioread(int fd, aiocb_s *cb) { |
|
207 ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset); |
|
208 cb->result = result; |
|
209 if(result < 0) { |
|
210 cb->result_errno = errno; |
|
211 } |
|
212 return event_send(cb->evhandler, cb->event); |
|
213 } |
|
214 |
|
215 int ev_aiowrite(int fd, aiocb_s *cb) { |
|
216 ssize_t result = pwrite(fd, cb->buf, cb->nbytes, cb->offset); |
|
217 cb->result = result; |
|
218 if(result < 0) { |
|
219 cb->result_errno = errno; |
|
220 } |
|
221 return event_send(cb->evhandler, cb->event); |
|
222 } |
|
223 |
|
224 |
|
225 int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) { |
|
226 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event); |
|
227 } |
|
228 |
|
229 int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) { |
|
230 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event); |
|
231 } |
|
232 |
|
233 int event_removepoll(EventHandler *ev, SYS_NETFD fd) { |
|
234 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL); |
|
235 } |