101 } |
101 } |
102 |
102 |
103 return ev; |
103 return ev; |
104 } |
104 } |
105 |
105 |
|
106 static volatile int ev_close = 0; |
|
107 |
|
108 void ev_instance_close(EventHandler *h) { |
|
109 EventHandlerLinux *ev = (EventHandlerLinux*)h; |
|
110 ev_close = 1; |
|
111 close(ev->ep); |
|
112 } |
|
113 |
|
114 // unique event addr that indicates shutdown |
|
115 static Event shutdown_event; |
|
116 void ev_instance_shutdown(EventHandler *h) { |
|
117 event_send(h, &shutdown_event); |
|
118 } |
|
119 |
106 void ev_handle_events(EventHandlerLinux *ev) { |
120 void ev_handle_events(EventHandlerLinux *ev) { |
107 EventHandler *h = (EventHandler*)ev; |
121 EventHandler *h = (EventHandler*)ev; |
108 int ep = ev->ep; |
122 int ep = ev->ep; |
109 |
123 |
110 struct epoll_event events[EV_MAX_EVENTS]; |
124 struct epoll_event events[EV_MAX_EVENTS]; |
111 Event* finished[EV_MAX_EVENTS]; |
125 Event* finished[EV_MAX_EVENTS]; |
112 |
126 |
113 size_t queue_len = 0; |
127 size_t queue_len = 0; |
114 |
128 |
115 int loop_ctn = 0; |
129 int loop_ctn = 0; |
116 for(;;) { |
130 int ev_shutdown = 0; |
|
131 while(!ev_shutdown) { |
117 // if ev->event_queue contains events, we process them first |
132 // if ev->event_queue contains events, we process them first |
118 // otherwise we get events from epoll |
133 // otherwise we get events from epoll |
119 int ret = 0; |
134 int ret = 0; |
120 if(queue_len > 0) { |
135 if(queue_len > 0) { |
121 pthread_mutex_lock(&ev->queue_lock); |
136 pthread_mutex_lock(&ev->queue_lock); |
148 |
163 |
149 pthread_mutex_unlock(&ev->queue_lock); |
164 pthread_mutex_unlock(&ev->queue_lock); |
150 } else { |
165 } else { |
151 // wait for events |
166 // wait for events |
152 ret = epoll_wait(ep, events, EV_MAX_EVENTS, EV_IDLE_TIMEOUT * 1000); |
167 ret = epoll_wait(ep, events, EV_MAX_EVENTS, EV_IDLE_TIMEOUT * 1000); |
153 if(ret == -1 && errno != EINTR) { |
168 if(ret == -1) { |
154 log_ereport(LOG_FAILURE, "epoll_wait failed: %s", strerror(errno)); |
169 if(errno != EINTR) { |
|
170 if(!ev_close) { |
|
171 log_ereport(LOG_CATASTROPHE, "epoll_wait failed: %s", strerror(errno)); |
|
172 } |
|
173 break; |
|
174 } |
155 continue; |
175 continue; |
156 } |
176 } |
157 } |
177 } |
158 |
178 |
159 int numfinished = 0; |
179 int numfinished = 0; |
209 "epoll_wait failed: %s", |
229 "epoll_wait failed: %s", |
210 strerror(errno)); |
230 strerror(errno)); |
211 } |
231 } |
212 } |
232 } |
213 } |
233 } |
|
234 } else if(event == &shutdown_event) { |
|
235 ev_instance_close(h); |
214 } |
236 } |
215 } |
237 } |
216 // call event finish handlers |
238 // call event finish handlers |
217 for(int i=0;i<numfinished;i++) { |
239 for(int i=0;i<numfinished;i++) { |
218 Event *event = finished[i]; |
240 Event *event = finished[i]; |
229 |
251 |
230 if(ret == 0 || ++loop_ctn >= EV_IDLE_LOOP_CTN) { |
252 if(ret == 0 || ++loop_ctn >= EV_IDLE_LOOP_CTN) { |
231 watchlist_check(&ev->base, 0); |
253 watchlist_check(&ev->base, 0); |
232 loop_ctn = 0; |
254 loop_ctn = 0; |
233 } |
255 } |
|
256 } |
|
257 |
|
258 // epoll fd is already closed |
|
259 |
|
260 ev_queue_free(ev->queue_begin); |
|
261 pthread_mutex_destroy(&ev->queue_lock); |
|
262 close(ev->event_fd); |
|
263 free(ev); |
|
264 } |
|
265 |
|
266 void ev_queue_free(EventQueue *queue) { |
|
267 while(queue) { |
|
268 EventQueue *next = queue->next; |
|
269 free(queue); |
|
270 queue = next; |
234 } |
271 } |
235 } |
272 } |
236 |
273 |
237 int ev_convert2sys_events(int events) { |
274 int ev_convert2sys_events(int events) { |
238 int e = EPOLLET; |
275 int e = EPOLLET; |