src/server/daemon/event_linux.c

changeset 193
aa8393527b1e
parent 162
b169992137a8
parent 192
6a145e13d933
child 443
ef3c8a0e1fee
equal deleted inserted replaced
183:f33974f0dce0 193:aa8393527b1e
28 28
29 #include <stdio.h> 29 #include <stdio.h>
30 #include <stdlib.h> 30 #include <stdlib.h>
31 #include <errno.h> 31 #include <errno.h>
32 #include <sys/epoll.h> 32 #include <sys/epoll.h>
33 #include <sys/eventfd.h>
33 34
34 #include "../util/systhr.h" 35 #include "../util/systhr.h"
35 #include "../util/atomic.h" 36 #include "../util/atomic.h"
36 37
38 #include "../util/io.h"
37 39
38 #include "event.h" 40 #include "event.h"
39 #include "event_linux.h" 41 #include "event_linux.h"
40 42
41 43
42 event_handler_t* evhandler_create(int numthreads) { 44 EVHandler* evhandler_create(EventHandlerConfig *cfg) {
43 event_handler_t *ev = malloc(sizeof(event_handler_t)); 45 EVHandler *ev = malloc(sizeof(EVHandler));
44 if(ev == NULL) { 46 ev->current = 0;
45 return NULL; 47 ev->instances = calloc(cfg->nthreads, sizeof(void*));
46 } 48 ev->numins = cfg->nthreads;
47 49
48 ev->ep = calloc(numthreads, sizeof(int)); 50 for(int i=0;i<cfg->nthreads;i++) {
49 if(ev->ep == NULL) { 51 EventHandler *handler = malloc(sizeof(EventHandler));
50 free(ev); 52 ev->instances[i] = handler;
51 return NULL; 53
52 } 54 handler->ep = epoll_create(64);
53 ev->nep = numthreads; 55 if(handler->ep == 0) {
54 ev->lep = 0; 56 // TODO: error
55
56 /* create ports event threads */
57 for(int i=0;i<numthreads;i++) {
58 /* create port */
59 ev->ep[i] = epoll_create(64);
60 if(ev->ep[i] == 0) {
61 free(ev->ep);
62 free(ev);
63 return NULL; 57 return NULL;
64 } 58 }
65 59
66 /* 60 int eventpipe[2];
67 * start a new handler thread 61 if(pipe(eventpipe)) {
68 * the thread needs the event port and a pointer to the event handler
69 */
70 ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t));
71 if(conf == NULL) {
72 free(ev->ep);
73 free(ev);
74 return NULL; 62 return NULL;
75 } 63 }
76 conf->handler = ev; 64 handler->eventin = eventpipe[0];
77 conf->ep = ev->ep[i]; 65 handler->eventout = eventpipe[1];
78 66
79 systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf); 67 struct epoll_event epev;
80 /* TODO: error handling */ 68 epev.events = EPOLLIN | EPOLLET; // input event, edge triggered
69 epev.data.ptr = NULL;
70 if(epoll_ctl(handler->ep, EPOLL_CTL_ADD, handler->eventin, &epev)) {
71 return NULL;
72 }
73
74 SYS_THREAD t = systhread_start(
75 0,
76 0,
77 (thrstartfunc)ev_handle_events,
78 handler);
79 systhread_detach(t);
81 } 80 }
82 81
83 return ev; 82 return ev;
84 } 83 }
85 84
86 85
87 void ev_handle_events(ev_thr_conf_t *conf) { 86 void ev_handle_events(EventHandler *ev) {
88 event_handler_t *ev = conf->handler; 87 int ep = ev->ep;
89 int ep = conf->ep;
90
91 free(conf);
92 88
93 //port_event_t events[16]; 89 //port_event_t events[16];
94 struct epoll_event events[16]; 90 struct epoll_event events[16];
95 91
96 for(;;) { 92 for(;;) {
100 log_ereport(LOG_FAILURE, "epoll_wait failed: %s", strerror(errno)); 96 log_ereport(LOG_FAILURE, "epoll_wait failed: %s", strerror(errno));
101 continue; 97 continue;
102 } 98 }
103 99
104 for(int i=0;i<ret;i++) { 100 for(int i=0;i<ret;i++) {
105 event_t *event = events[i].data.ptr; 101 Event *event = events[i].data.ptr;
106 if(event->fn) { 102 if(!event) {
107 int saved_ev = event->poll; 103 char ebuf[sizeof(Event*)];
104 int ebufpos = 0;
105 char *b = ebuf;
106 while(ebufpos < sizeof(Event*)) {
107 ssize_t r = read(ev->eventin, b + ebufpos, sizeof(Event*)-ebufpos);
108 if(r < 0) {
109 break;
110 }
111 ebufpos += r;
112 }
113 if(ebufpos == sizeof(Event*)) {
114 intptr_t *p = (intptr_t*)b;
115 *(&event) = (Event*)*p;
116 if(event->fn) {
117 if(!event->fn(ev, event) && event->finish) {
118 event->finish(ev, event);
119 }
120 }
121 }
122 } else if(event->fn) {
123 int saved_ev = event->events;
108 if(!event->fn(ev, event)) { 124 if(!event->fn(ev, event)) {
109 // event fn returned 0 -> remove event from epoll 125 // event fn returned 0 -> remove event from epoll
110 if(epoll_ctl(ep, EPOLL_CTL_DEL, event->object, NULL)) { 126 if(epoll_ctl(ep, EPOLL_CTL_DEL, event->object, NULL)) {
111 event->error = 1; 127 event->error = 1;
112 log_ereport( 128 log_ereport(
119 // if set, execute event->finish 135 // if set, execute event->finish
120 if(event->finish) { 136 if(event->finish) {
121 event->finish(ev, event); 137 event->finish(ev, event);
122 } 138 }
123 } else { 139 } else {
124 if(saved_ev != event->poll) { 140 if(saved_ev != event->events) {
125 // event type changed 141 // event type changed
126 struct epoll_event epev; 142 struct epoll_event epev;
127 epev.events = EPOLLET; 143 epev.events = EPOLLET;
128 epev.data.ptr = event; 144 epev.data.ptr = event;
129 145
130 // adjust epoll events 146 // adjust epoll events
131 if((event->poll & EVENT_POLLIN) == EVENT_POLLIN) { 147 epev.events = ev_convert2sys_events(event->events);
132 epev.events |= EPOLLIN;
133 }
134 if((event->poll & EVENT_POLLOUT) == EVENT_POLLOUT) {
135 epev.events |= EPOLLOUT;
136 }
137 148
138 if(epoll_ctl(ep, EPOLL_CTL_MOD, event->object, NULL)) { 149 if(epoll_ctl(ep, EPOLL_CTL_MOD, event->object, &epev)) {
139 log_ereport( 150 log_ereport(
140 LOG_FAILURE, 151 LOG_FAILURE,
141 "epoll_wait failed: %s", 152 "epoll_wait failed: %s",
142 strerror(errno)); 153 strerror(errno));
143 } 154 }
146 } 157 }
147 } 158 }
148 } 159 }
149 } 160 }
150 161
151 /* returns a event handler port */ 162 int ev_convert2sys_events(int events) {
152 int ev_get_port(event_handler_t *h) { 163 int e = EPOLLET;
153 int nps = h->nep; 164 if((events & EVENT_POLLIN) == EVENT_POLLIN) {
154 if(nps == 1) { 165 e |= EPOLLIN;
155 return h->ep[0]; 166 }
156 } 167 if((events & EVENT_POLLOUT) == EVENT_POLLOUT) {
157 168 e |= EPOLLOUT;
158 int cp = h->lep % nps; 169 }
159 ws_atomic_inc32(&h->lep); 170 return e;
160 171 }
161 return h->ep[cp]; 172
162 } 173 int ev_pollin(EventHandler *h, int fd, Event *event) {
163
164 int ev_pollin(event_handler_t *h, int fd, event_t *event) {
165 event->object = (intptr_t)fd; 174 event->object = (intptr_t)fd;
166 event->poll = EVENT_POLLIN; 175 event->events = EVENT_POLLIN;
167 struct epoll_event epev; 176 struct epoll_event epev;
168 epev.events = EPOLLIN | EPOLLET; // input event, edge triggered 177 epev.events = EPOLLIN | EPOLLET; // input event, edge triggered
169 epev.data.ptr = event; 178 epev.data.ptr = event;
170 return epoll_ctl(ev_get_port(h), EPOLL_CTL_ADD, fd, &epev); 179 return epoll_ctl(h->ep, EPOLL_CTL_ADD, fd, &epev);
171 } 180 }
172 181
173 int ev_pollout(event_handler_t *h, int fd, event_t *event) { 182 int ev_pollout(EventHandler *h, int fd, Event *event) {
174 event->object = (intptr_t)fd; 183 event->object = (intptr_t)fd;
175 event->poll = EVENT_POLLOUT; 184 event->events = EVENT_POLLOUT;
176 struct epoll_event epev; 185 struct epoll_event epev;
177 epev.events = EPOLLOUT | EPOLLET; // input event, edge triggered 186 epev.events = EPOLLOUT | EPOLLET; // input event, edge triggered
178 epev.data.ptr = event; 187 epev.data.ptr = event;
179 return epoll_ctl(ev_get_port(h), EPOLL_CTL_ADD, fd, &epev); 188 return epoll_ctl(h->ep, EPOLL_CTL_ADD, fd, &epev);
180 } 189 }
181 190
182 int evt_send(event_handler_t *h, event_t *event) { 191 int ev_remove_poll(EventHandler *h, int fd) {
192 return epoll_ctl(h->ep, EPOLL_CTL_DEL, fd, NULL);
193 }
194
195 int event_send(EventHandler *h, Event *event) {
183 event->object = 0; 196 event->object = 0;
184 // TODO: implement using threadpool or eventfd 197 event->events = 0;
185 fprintf(stderr, "Warning: evt_send not implemented\n"); 198 ssize_t r = write(h->eventout, &event, sizeof(Event*));
186 return 0; 199 if(r < sizeof(Event*)) {
187 } 200 log_ereport(LOG_FAILURE, "failed to send event: %s", strerror(errno));
201 }
202 return r > 0 ? 0 : 1;
203 }
204
205 // TODO: remove this fake aio
206 int ev_aioread(int fd, aiocb_s *cb) {
207 ssize_t result = pread(fd, cb->buf, cb->nbytes, cb->offset);
208 cb->result = result;
209 if(result < 0) {
210 cb->result_errno = errno;
211 }
212 return event_send(cb->evhandler, cb->event);
213 }
214
215 int ev_aiowrite(int fd, aiocb_s *cb) {
216 ssize_t result = pwrite(fd, cb->buf, cb->nbytes, cb->offset);
217 cb->result = result;
218 if(result < 0) {
219 cb->result_errno = errno;
220 }
221 return event_send(cb->evhandler, cb->event);
222 }
223
224
225 int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) {
226 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event);
227 }
228
229 int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) {
230 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event);
231 }
232
233 int event_removepoll(EventHandler *ev, SYS_NETFD fd) {
234 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL);
235 }

mercurial