1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <atomic.h>
32
33 #include "../util/io.h"
34
35 #include "event_solaris.h"
36
37 #include "httprequest.h"
38
39 EVHandler* evhandler_create(EventHandlerConfig *cfg) {
40 EVHandler *ev = malloc(
sizeof(EVHandler));
41 ev->current =
0;
42 ev->instances = calloc(cfg->nthreads,
sizeof(
void*));
43 ev->numins = cfg->nthreads;
44
45 for(
int i=
0;i<cfg->nthreads;i++) {
46 EventHandlerSolaris *handler = malloc(
sizeof(EventHandlerSolaris));
47 memset(handler,
0,
sizeof(EventHandlerSolaris));
48 ev->instances[i] = (EventHandler*)handler;
49
50 handler->port = port_create();
51 if(handler->port <
0) {
52 log_ereport(
LOG_FAILURE,
"evhandler_create: port_create: %s", strerror(errno));
53 return NULL;
54 }
55
56 handler->thr = systhread_start(
57 0,
58 0,
59 (thrstartfunc)ev_handle_events,
60 handler);
61 }
62
63 return ev;
64 }
65
66 void ev_instance_wait(EventHandler *h) {
67 EventHandlerLinux *ev = (EventHandlerLinux*)h;
68 void *ret;
69 pthread_join(ev->thr, &ret);
70 }
71
72 static volatile int ev_close =
0;
73
74 void ev_instance_close(EventHandler *h) {
75 EventHandlerSolaris *ev = (EventHandlerSolaris*)h;
76 close(ev->port);
77 ev_close =
1;
78 }
79
80
81 static Event shutdown_event;
82 void ev_instance_shutdown(EventHandler *h) {
83 event_send(h, &shutdown_event);
84 }
85
86
87 void ev_handle_events(EventHandlerSolaris *ev) {
88 EventHandler *h = (EventHandler*)ev;
89 port_event_t events[
EV_MAX_EVENTS];
90 Event *finished[
EV_MAX_EVENTS];
91 struct timespec timeout;
92 timeout.tv_nsec =
0;
93 timeout.tv_sec =
600;
94
95 for(;;) {
96
97 uint_t nev =
1;
98 int ret = port_getn(ev->port, events,
EV_MAX_EVENTS, &nev, &timeout);
99 if(ret == -
1) {
100 if(errno !=
EINTR && errno !=
ETIME) {
101 if(!ev_close) {
102 log_ereport(
LOG_CATASTROPHE,
"port_getn failed: %s", strerror(errno));
103 }
104 break;
105 }
106 continue;
107 }
108
109 int numfinished =
0;
110 ev->numret =
0;
111 for(
int i=
0;i<nev;i++) {
112 Event *event = events[i].portev_user;
113 if(events[i].portev_source ==
PORT_SOURCE_AIO) {
114 aiocb_t *aiocb = (
aiocb_t*)events[i].portev_object;
115 if(event) {
116 aiocb_s *aio = (aiocb_s*)event->object;
117 aio->result = aiocb->aio_resultp.aio_return;
118 aio->result_errno = aiocb->aio_resultp.aio_errno;
119 if(event->fn) {
120 if(!event->fn(h, event) && event->finish) {
121 finished[numfinished++] = event;
122 }
123 }
124 }
125 free(aiocb);
126 }
else {
127 if(event->fn) {
128 if(event->fn(ev, event)) {
129
130
131
132
133
134 if(port_associate(
135 ev->port,
136 PORT_SOURCE_FD,
137 (
uintptr_t)event->object,
138 ev_convert2sys_events(event->events),
139 event))
140 {
141 perror(
"port_associate");
142 }
143 }
else if(event->finish) {
144 event->finish(h, event);
145 }
146 }
else if(event == &shutdown_event) {
147 ev_instance_close(h);
148 }
149 }
150 }
151
152 for(
int i=
0;i<numfinished;i++) {
153 Event *event = finished[i];
154
155 if(finished[i]->finish) {
156 finished[i]->finish(ev, event);
157 }
158 }
159
160 for(
int i=
0;i<ev->base.numret;i++) {
161 EVReturn ret = ev->base.fnreturn[i];
162 nsapi_saf_return(ret.sn, ret.rq, ret.ret);
163 }
164 }
165
166 free(ev);
167 }
168
169 int ev_convert2sys_events(
int events) {
170 int e =
0;
171 if((events &
EVENT_POLLIN) ==
EVENT_POLLIN) {
172 e |=
POLLIN;
173 }
174 if((events &
EVENT_POLLOUT) ==
EVENT_POLLOUT) {
175 e |=
POLLOUT;
176 }
177 return e;
178 }
179
180
181 int ev_pollin(EventHandler *h,
int fd, Event *event) {
182 EventHandlerSolaris *ev = (EventHandlerSolaris*)h;
183 event->object = (
intptr_t)fd;
184 event->events =
EVENT_POLLIN;
185 return port_associate(
186 ev->port,
187 PORT_SOURCE_FD,
188 (
uintptr_t)fd,
189 POLLIN,
190 event);
191 }
192
193 int ev_pollout(EventHandler *h,
int fd, Event *event) {
194 EventHandlerSolaris *ev = (EventHandlerSolaris*)h;
195 event->object = (
intptr_t)fd;
196 event->events =
EVENT_POLLOUT;
197 return port_associate(
198 ev->port,
199 PORT_SOURCE_FD,
200 (
uintptr_t)fd,
201 POLLOUT,
202 event);
203 }
204
205 int ev_remove_poll(EventHandler *h,
int fd) {
206 EventHandlerSolaris *ev = (EventHandlerSolaris*)h;
207 return port_dissociate(ev->port,
PORT_SOURCE_FD, (
uintptr_t)fd);
208 }
209
210 int event_send(EventHandler *h, Event *event) {
211 EventHandlerSolaris *ev = (EventHandlerSolaris*)h;
212 event->object =
0;
213 event->events =
0;
214 return port_send(ev->port,
0, event);
215 }
216
217 static int ev_aio(
int fd, aiocb_s *cb, WSBool read) {
218 EventHandlerSolaris *ev = cb->evhandler;
219 if(!ev) {
220 return -
1;
221 }
222
223 aiocb_t *aiocb = malloc(
sizeof(
aiocb_t));
224 if(!aiocb) {
225 return -
1;
226 }
227 ZERO(aiocb,
sizeof(
aiocb_t));
228
229 aiocb->aio_fildes = fd;
230 aiocb->aio_buf = cb->buf;
231 aiocb->aio_nbytes = cb->nbytes;
232 aiocb->aio_offset = cb->offset;
233
234 port_notify_t *portnotify = malloc(
sizeof(
port_notify_t));
235 if(!portnotify) {
236 free(aiocb);
237 return -
1;
238 }
239 portnotify->portnfy_port = ev->port;
240 portnotify->portnfy_user = cb->event;
241 aiocb->aio_sigevent.sigev_notify =
SIGEV_PORT;
242 aiocb->aio_sigevent.sigev_value.sival_ptr = portnotify;
243
244 if(read) {
245 return aio_read(aiocb);
246 }
else {
247 return aio_write(aiocb);
248 }
249 }
250
251 int ev_aioread(
int fd, aiocb_s *cb) {
252 return ev_aio(fd, cb,
TRUE);
253 }
254
255 int ev_aiowrite(
int fd, aiocb_s *cb) {
256 return ev_aio(fd, cb,
FALSE);
257 }
258
259
260 int event_pollin(EventHandler *ev,
SYS_NETFD fd, Event *event) {
261 return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_IN, event);
262 }
263
264 int event_pollout(EventHandler *ev,
SYS_NETFD fd, Event *event) {
265 return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_OUT, event);
266 }
267
268 int event_removepoll(EventHandler *ev,
SYS_NETFD fd) {
269 return ((IOStream*)fd)->poll(fd, ev,
IO_POLL_NONE,
NULL);
270 }
271