28 |
28 |
29 #include <stdio.h> |
29 #include <stdio.h> |
30 #include <stdlib.h> |
30 #include <stdlib.h> |
31 #include <atomic.h> |
31 #include <atomic.h> |
32 |
32 |
|
33 #include "../util/io.h" |
|
34 |
33 #include "event_solaris.h" |
35 #include "event_solaris.h" |
34 |
36 |
35 event_handler_t* evhandler_create(int numthreads) { |
37 EVHandler* evhandler_create(EventHandlerConfig *cfg) { |
36 event_handler_t *ev = malloc(sizeof(event_handler_t)); |
38 EVHandler *ev = malloc(sizeof(EVHandler)); |
37 if(ev == NULL) { |
39 ev->current = 0; |
38 return NULL; |
40 ev->instances = calloc(cfg->nthreads, sizeof(void*)); |
39 } |
41 ev->numins = cfg->nthreads; |
40 |
42 |
41 ev->ports = calloc(numthreads, sizeof(int)); |
43 for(int i=0;i<cfg->nthreads;i++) { |
42 if(ev->ports == NULL) { |
44 EventHandler *handler = malloc(sizeof(EventHandler)); |
43 free(ev); |
45 ev->instances[i] = handler; |
44 return NULL; |
46 |
45 } |
47 handler->port = port_create(); |
46 ev->nports = numthreads; |
48 if(handler->port == 0) { |
47 ev->lp = 0; |
49 // TODO: error |
48 |
|
49 /* create ports event threads */ |
|
50 for(int i=0;i<numthreads;i++) { |
|
51 /* create port */ |
|
52 ev->ports[i] = port_create(); |
|
53 if(ev->ports[i] == 0) { |
|
54 free(ev->ports); |
|
55 free(ev); |
|
56 return NULL; |
50 return NULL; |
57 } |
51 } |
58 |
52 |
59 /* |
53 SYS_THREAD t = systhread_start( |
60 * start a new handler thread |
54 0, |
61 * the thread needs the event port and a pointer to the event handler |
55 0, |
62 */ |
56 (thrstartfunc)ev_handle_events, |
63 ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t)); |
57 handler); |
64 if(conf == NULL) { |
58 systhread_detach(t); |
65 free(ev->ports); |
|
66 free(ev); |
|
67 return NULL; |
|
68 } |
|
69 conf->handler = ev; |
|
70 conf->port = ev->ports[i]; |
|
71 |
|
72 systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf); |
|
73 // TODO: error handling |
|
74 } |
59 } |
75 |
60 |
76 return ev; |
61 return ev; |
77 } |
62 } |
78 |
63 |
79 void ev_handle_events(ev_thr_conf_t *conf) { |
64 void ev_handle_events(EventHandler *ev) { |
80 event_handler_t *ev = conf->handler; |
65 port_event_t events[64]; |
81 int port = conf->port; |
|
82 |
|
83 free(conf); |
|
84 |
|
85 port_event_t events[16]; |
|
86 struct timespec timeout; |
66 struct timespec timeout; |
87 timeout.tv_nsec = 0; |
67 timeout.tv_nsec = 0; |
88 timeout.tv_sec = 600; |
68 timeout.tv_sec = 600; |
89 |
69 |
90 for(;;) { |
70 for(;;) { |
91 // wait for events |
71 // wait for events |
92 uint_t nev = 1; |
72 uint_t nev = 1; |
93 int ret = port_getn(port, events, 16, &nev, &timeout); |
73 int ret = port_getn(ev->port, events, 64, &nev, &timeout); |
94 if(ret == -1) { |
74 if(ret == -1) { |
95 // TODO: check for error |
75 // TODO: check for error |
96 perror("port_getn"); |
76 perror("port_getn"); |
97 continue; |
77 continue; |
98 } |
78 } |
99 |
79 |
100 for(int i=0;i<nev;i++) { |
80 for(int i=0;i<nev;i++) { |
101 event_t *event = events[i].portev_user; |
81 Event *event = events[i].portev_user; |
102 if(event->fn) { |
82 if(events[i].portev_source == PORT_SOURCE_AIO) { |
103 int saved_ev = event->poll; |
83 aiocb_t *aiocb = (aiocb_t*)events[i].portev_object; |
104 if(event->fn(ev, event)) { |
84 if(event) { |
105 /* |
85 aiocb_s *aio = (aiocb_s*)event->object; |
106 * on solaris we have to reassociate the fd after |
86 aio->result = aiocb->aio_resultp.aio_return; |
107 * each event |
87 aio->result_errno = aiocb->aio_resultp.aio_errno; |
108 * we do this if the event function returns 1 |
88 if(event->fn) { |
109 */ |
89 if(!event->fn(ev, event) && event->finish) { |
110 |
90 event->finish(ev, event); |
111 if(event->poll != saved_ev) { |
|
112 // event type changed |
|
113 int ne = 0; |
|
114 if((event->poll & EVENT_POLLIN) == EVENT_POLLIN) { |
|
115 ne |= POLLIN; |
|
116 } |
|
117 if((event->poll & EVENT_POLLOUT) == EVENT_POLLOUT) { |
|
118 ne |= POLLOUT; |
|
119 } |
91 } |
120 } |
92 } |
121 |
|
122 if(ev_poll(ev, event)) { |
|
123 perror("port_associate"); |
|
124 } |
|
125 } else if(event->finish) { |
|
126 event->finish(ev, event); |
|
127 } |
93 } |
128 } |
94 free(aiocb); |
|
95 } else { |
|
96 if(event->fn) { |
|
97 if(event->fn(ev, event)) { |
|
98 /* |
|
99 * on solaris we have to reassociate the fd after |
|
100 * each event |
|
101 * we do this if the event function returns 1 |
|
102 */ |
|
103 if(port_associate( |
|
104 ev->port, |
|
105 PORT_SOURCE_FD, |
|
106 (uintptr_t)event->object, |
|
107 ev_convert2sys_events(event->events), |
|
108 event)) |
|
109 { |
|
110 perror("port_associate"); |
|
111 } |
|
112 } else if(event->finish) { |
|
113 event->finish(ev, event); |
|
114 } |
|
115 } |
|
116 } |
129 } |
117 } |
130 } |
118 } |
131 } |
119 } |
132 |
120 |
133 // returns a event handler port |
121 int ev_convert2sys_events(int events) { |
134 int ev_get_port(event_handler_t *h) { |
122 int e = 0; |
135 int nps = h->nports; |
123 if((events & EVENT_POLLIN) == EVENT_POLLIN) { |
136 if(nps == 1) { |
124 e |= POLLIN; |
137 return h->ports[0]; |
125 } |
138 } |
126 if((events & EVENT_POLLOUT) == EVENT_POLLOUT) { |
139 |
127 e |= POLLOUT; |
140 int cp = h->lp % nps; |
128 } |
141 atomic_inc_32(&h->lp); |
129 return e; |
142 |
130 } |
143 return h->ports[cp]; |
131 |
144 } |
132 |
145 |
133 int ev_pollin(EventHandler *h, int fd, Event *event) { |
146 int ev_pollin(event_handler_t *h, int fd, event_t *event) { |
|
147 event->object = (intptr_t)fd; |
134 event->object = (intptr_t)fd; |
148 event->events = POLLIN; |
135 event->events = EVENT_POLLIN; |
149 event->poll = EVENT_POLLIN; |
|
150 return port_associate( |
136 return port_associate( |
151 ev_get_port(h), |
137 h->port, |
152 PORT_SOURCE_FD, |
138 PORT_SOURCE_FD, |
153 (uintptr_t)fd, |
139 (uintptr_t)fd, |
154 POLLIN, |
140 POLLIN, |
155 event); |
141 event); |
156 } |
142 } |
157 |
143 |
158 int ev_pollout(event_handler_t *h, int fd, event_t *event) { |
144 int ev_pollout(EventHandler *h, int fd, Event *event) { |
159 event->object = (intptr_t)fd; |
145 event->object = (intptr_t)fd; |
160 event->events = POLLOUT; |
146 event->events = EVENT_POLLOUT; |
161 event->poll = EVENT_POLLOUT; |
|
162 return port_associate( |
147 return port_associate( |
163 ev_get_port(h), |
148 h->port, |
164 PORT_SOURCE_FD, |
149 PORT_SOURCE_FD, |
165 (uintptr_t)fd, |
150 (uintptr_t)fd, |
166 POLLOUT, |
151 POLLOUT, |
167 event); |
152 event); |
168 } |
153 } |
169 |
154 |
170 int ev_poll(event_handler_t *h, event_t *event) { |
155 int ev_remove_poll(EventHandler *h, int fd) { |
171 return port_associate( |
156 return port_dissociate(h->port, PORT_SOURCE_FD, (uintptr_t)fd); |
172 ev_get_port(h), |
157 } |
173 PORT_SOURCE_FD, |
158 |
174 event->object, |
159 int event_send(EventHandler *h, Event *event) { |
175 event->events, |
|
176 event); |
|
177 } |
|
178 |
|
179 int evt_send(event_handler_t *h, event_t *event) { |
|
180 event->object = 0; |
160 event->object = 0; |
181 return port_send(ev_get_port(h), 0, event); |
161 event->events = 0; |
182 } |
162 return port_send(h->port, 0, event); |
|
163 } |
|
164 |
|
165 static int ev_aio(int fd, aiocb_s *cb, WSBool read) { |
|
166 EventHandler *ev = cb->evhandler; |
|
167 if(!ev) { |
|
168 return -1; |
|
169 } |
|
170 |
|
171 aiocb_t *aiocb = malloc(sizeof(aiocb_t)); |
|
172 if(!aiocb) { |
|
173 return -1; |
|
174 } |
|
175 ZERO(aiocb, sizeof(aiocb_t)); |
|
176 |
|
177 aiocb->aio_fildes = fd; |
|
178 aiocb->aio_buf = cb->buf; |
|
179 aiocb->aio_nbytes = cb->nbytes; |
|
180 aiocb->aio_offset = cb->offset; |
|
181 |
|
182 port_notify_t *portnotify = malloc(sizeof(port_notify_t)); |
|
183 if(!portnotify) { |
|
184 free(aiocb); |
|
185 return -1; |
|
186 } |
|
187 portnotify->portnfy_port = ev->port; |
|
188 portnotify->portnfy_user = cb->event; |
|
189 aiocb->aio_sigevent.sigev_notify = SIGEV_PORT; |
|
190 aiocb->aio_sigevent.sigev_value.sival_ptr = portnotify; |
|
191 |
|
192 if(read) { |
|
193 return aio_read(aiocb); |
|
194 } else { |
|
195 return aio_write(aiocb); |
|
196 } |
|
197 } |
|
198 |
|
199 int ev_aioread(int fd, aiocb_s *cb) { |
|
200 return ev_aio(fd, cb, TRUE); |
|
201 } |
|
202 |
|
203 int ev_aiowrite(int fd, aiocb_s *cb) { |
|
204 return ev_aio(fd, cb, FALSE); |
|
205 } |
|
206 |
|
207 |
|
208 int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) { |
|
209 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event); |
|
210 } |
|
211 |
|
212 int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) { |
|
213 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event); |
|
214 } |
|
215 |
|
216 int event_removepoll(EventHandler *ev, SYS_NETFD fd) { |
|
217 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL); |
|
218 } |