src/server/daemon/event_solaris.c

changeset 193
aa8393527b1e
parent 162
b169992137a8
parent 187
4384bfbb7e26
child 443
ef3c8a0e1fee
equal deleted inserted replaced
183:f33974f0dce0 193:aa8393527b1e
28 28
29 #include <stdio.h> 29 #include <stdio.h>
30 #include <stdlib.h> 30 #include <stdlib.h>
31 #include <atomic.h> 31 #include <atomic.h>
32 32
33 #include "../util/io.h"
34
33 #include "event_solaris.h" 35 #include "event_solaris.h"
34 36
35 event_handler_t* evhandler_create(int numthreads) { 37 EVHandler* evhandler_create(EventHandlerConfig *cfg) {
36 event_handler_t *ev = malloc(sizeof(event_handler_t)); 38 EVHandler *ev = malloc(sizeof(EVHandler));
37 if(ev == NULL) { 39 ev->current = 0;
38 return NULL; 40 ev->instances = calloc(cfg->nthreads, sizeof(void*));
39 } 41 ev->numins = cfg->nthreads;
40 42
41 ev->ports = calloc(numthreads, sizeof(int)); 43 for(int i=0;i<cfg->nthreads;i++) {
42 if(ev->ports == NULL) { 44 EventHandler *handler = malloc(sizeof(EventHandler));
43 free(ev); 45 ev->instances[i] = handler;
44 return NULL; 46
45 } 47 handler->port = port_create();
46 ev->nports = numthreads; 48 if(handler->port == 0) {
47 ev->lp = 0; 49 // TODO: error
48
49 /* create ports event threads */
50 for(int i=0;i<numthreads;i++) {
51 /* create port */
52 ev->ports[i] = port_create();
53 if(ev->ports[i] == 0) {
54 free(ev->ports);
55 free(ev);
56 return NULL; 50 return NULL;
57 } 51 }
58 52
59 /* 53 SYS_THREAD t = systhread_start(
60 * start a new handler thread 54 0,
61 * the thread needs the event port and a pointer to the event handler 55 0,
62 */ 56 (thrstartfunc)ev_handle_events,
63 ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t)); 57 handler);
64 if(conf == NULL) { 58 systhread_detach(t);
65 free(ev->ports);
66 free(ev);
67 return NULL;
68 }
69 conf->handler = ev;
70 conf->port = ev->ports[i];
71
72 systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf);
73 // TODO: error handling
74 } 59 }
75 60
76 return ev; 61 return ev;
77 } 62 }
78 63
79 void ev_handle_events(ev_thr_conf_t *conf) { 64 void ev_handle_events(EventHandler *ev) {
80 event_handler_t *ev = conf->handler; 65 port_event_t events[64];
81 int port = conf->port;
82
83 free(conf);
84
85 port_event_t events[16];
86 struct timespec timeout; 66 struct timespec timeout;
87 timeout.tv_nsec = 0; 67 timeout.tv_nsec = 0;
88 timeout.tv_sec = 600; 68 timeout.tv_sec = 600;
89 69
90 for(;;) { 70 for(;;) {
91 // wait for events 71 // wait for events
92 uint_t nev = 1; 72 uint_t nev = 1;
93 int ret = port_getn(port, events, 16, &nev, &timeout); 73 int ret = port_getn(ev->port, events, 64, &nev, &timeout);
94 if(ret == -1) { 74 if(ret == -1) {
95 // TODO: check for error 75 // TODO: check for error
96 perror("port_getn"); 76 perror("port_getn");
97 continue; 77 continue;
98 } 78 }
99 79
100 for(int i=0;i<nev;i++) { 80 for(int i=0;i<nev;i++) {
101 event_t *event = events[i].portev_user; 81 Event *event = events[i].portev_user;
102 if(event->fn) { 82 if(events[i].portev_source == PORT_SOURCE_AIO) {
103 int saved_ev = event->poll; 83 aiocb_t *aiocb = (aiocb_t*)events[i].portev_object;
104 if(event->fn(ev, event)) { 84 if(event) {
105 /* 85 aiocb_s *aio = (aiocb_s*)event->object;
106 * on solaris we have to reassociate the fd after 86 aio->result = aiocb->aio_resultp.aio_return;
107 * each event 87 aio->result_errno = aiocb->aio_resultp.aio_errno;
108 * we do this if the event function returns 1 88 if(event->fn) {
109 */ 89 if(!event->fn(ev, event) && event->finish) {
110 90 event->finish(ev, event);
111 if(event->poll != saved_ev) {
112 // event type changed
113 int ne = 0;
114 if((event->poll & EVENT_POLLIN) == EVENT_POLLIN) {
115 ne |= POLLIN;
116 }
117 if((event->poll & EVENT_POLLOUT) == EVENT_POLLOUT) {
118 ne |= POLLOUT;
119 } 91 }
120 } 92 }
121
122 if(ev_poll(ev, event)) {
123 perror("port_associate");
124 }
125 } else if(event->finish) {
126 event->finish(ev, event);
127 } 93 }
128 } 94 free(aiocb);
95 } else {
96 if(event->fn) {
97 if(event->fn(ev, event)) {
98 /*
99 * on solaris we have to reassociate the fd after
100 * each event
101 * we do this if the event function returns 1
102 */
103 if(port_associate(
104 ev->port,
105 PORT_SOURCE_FD,
106 (uintptr_t)event->object,
107 ev_convert2sys_events(event->events),
108 event))
109 {
110 perror("port_associate");
111 }
112 } else if(event->finish) {
113 event->finish(ev, event);
114 }
115 }
116 }
129 } 117 }
130 } 118 }
131 } 119 }
132 120
133 // returns a event handler port 121 int ev_convert2sys_events(int events) {
134 int ev_get_port(event_handler_t *h) { 122 int e = 0;
135 int nps = h->nports; 123 if((events & EVENT_POLLIN) == EVENT_POLLIN) {
136 if(nps == 1) { 124 e |= POLLIN;
137 return h->ports[0]; 125 }
138 } 126 if((events & EVENT_POLLOUT) == EVENT_POLLOUT) {
139 127 e |= POLLOUT;
140 int cp = h->lp % nps; 128 }
141 atomic_inc_32(&h->lp); 129 return e;
142 130 }
143 return h->ports[cp]; 131
144 } 132
145 133 int ev_pollin(EventHandler *h, int fd, Event *event) {
146 int ev_pollin(event_handler_t *h, int fd, event_t *event) {
147 event->object = (intptr_t)fd; 134 event->object = (intptr_t)fd;
148 event->events = POLLIN; 135 event->events = EVENT_POLLIN;
149 event->poll = EVENT_POLLIN;
150 return port_associate( 136 return port_associate(
151 ev_get_port(h), 137 h->port,
152 PORT_SOURCE_FD, 138 PORT_SOURCE_FD,
153 (uintptr_t)fd, 139 (uintptr_t)fd,
154 POLLIN, 140 POLLIN,
155 event); 141 event);
156 } 142 }
157 143
158 int ev_pollout(event_handler_t *h, int fd, event_t *event) { 144 int ev_pollout(EventHandler *h, int fd, Event *event) {
159 event->object = (intptr_t)fd; 145 event->object = (intptr_t)fd;
160 event->events = POLLOUT; 146 event->events = EVENT_POLLOUT;
161 event->poll = EVENT_POLLOUT;
162 return port_associate( 147 return port_associate(
163 ev_get_port(h), 148 h->port,
164 PORT_SOURCE_FD, 149 PORT_SOURCE_FD,
165 (uintptr_t)fd, 150 (uintptr_t)fd,
166 POLLOUT, 151 POLLOUT,
167 event); 152 event);
168 } 153 }
169 154
170 int ev_poll(event_handler_t *h, event_t *event) { 155 int ev_remove_poll(EventHandler *h, int fd) {
171 return port_associate( 156 return port_dissociate(h->port, PORT_SOURCE_FD, (uintptr_t)fd);
172 ev_get_port(h), 157 }
173 PORT_SOURCE_FD, 158
174 event->object, 159 int event_send(EventHandler *h, Event *event) {
175 event->events,
176 event);
177 }
178
179 int evt_send(event_handler_t *h, event_t *event) {
180 event->object = 0; 160 event->object = 0;
181 return port_send(ev_get_port(h), 0, event); 161 event->events = 0;
182 } 162 return port_send(h->port, 0, event);
163 }
164
165 static int ev_aio(int fd, aiocb_s *cb, WSBool read) {
166 EventHandler *ev = cb->evhandler;
167 if(!ev) {
168 return -1;
169 }
170
171 aiocb_t *aiocb = malloc(sizeof(aiocb_t));
172 if(!aiocb) {
173 return -1;
174 }
175 ZERO(aiocb, sizeof(aiocb_t));
176
177 aiocb->aio_fildes = fd;
178 aiocb->aio_buf = cb->buf;
179 aiocb->aio_nbytes = cb->nbytes;
180 aiocb->aio_offset = cb->offset;
181
182 port_notify_t *portnotify = malloc(sizeof(port_notify_t));
183 if(!portnotify) {
184 free(aiocb);
185 return -1;
186 }
187 portnotify->portnfy_port = ev->port;
188 portnotify->portnfy_user = cb->event;
189 aiocb->aio_sigevent.sigev_notify = SIGEV_PORT;
190 aiocb->aio_sigevent.sigev_value.sival_ptr = portnotify;
191
192 if(read) {
193 return aio_read(aiocb);
194 } else {
195 return aio_write(aiocb);
196 }
197 }
198
199 int ev_aioread(int fd, aiocb_s *cb) {
200 return ev_aio(fd, cb, TRUE);
201 }
202
203 int ev_aiowrite(int fd, aiocb_s *cb) {
204 return ev_aio(fd, cb, FALSE);
205 }
206
207
208 int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) {
209 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event);
210 }
211
212 int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) {
213 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event);
214 }
215
216 int event_removepoll(EventHandler *ev, SYS_NETFD fd) {
217 return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL);
218 }

mercurial