src/server/daemon/event_solaris.c

branch
aio
changeset 159
9ba9f8befa80
parent 153
85320d8b5d5c
child 172
5580517faafc
equal deleted inserted replaced
157:a0c8e752490d 159:9ba9f8befa80
30 #include <stdlib.h> 30 #include <stdlib.h>
31 #include <atomic.h> 31 #include <atomic.h>
32 32
33 #include "event_solaris.h" 33 #include "event_solaris.h"
34 34
35 event_handler_t* evhandler_create(int numthreads) { 35 EVHandler* evhandler_create(EventHandlerConfig *cfg) {
36 event_handler_t *ev = malloc(sizeof(event_handler_t)); 36 EVHandler *ev = malloc(sizeof(EVHandler));
37 if(ev == NULL) { 37 ev->current = 0;
38 return NULL; 38 ev->instances = calloc(cfg->nthreads, sizeof(void*));
39 } 39 ev->numins = cfg->nthreads;
40 40
41 ev->ports = calloc(numthreads, sizeof(int)); 41 for(int i=0;i<cfg->nthreads;i++) {
42 if(ev->ports == NULL) { 42 EventHandler *handler = malloc(sizeof(EventHandler));
43 free(ev); 43 ev->instances[i] = handler;
44 return NULL; 44
45 } 45 handler->port = port_create();
46 ev->nports = numthreads; 46 if(handler->port == 0) {
47 ev->lp = 0; 47 // TODO: error
48
49 /* create ports event threads */
50 for(int i=0;i<numthreads;i++) {
51 /* create port */
52 ev->ports[i] = port_create();
53 if(ev->ports[i] == 0) {
54 free(ev->ports);
55 free(ev);
56 return NULL; 48 return NULL;
57 } 49 }
58 50
59 /* 51 SYS_THREAD t = systhread_start(
60 * start a new handler thread 52 0,
61 * the thread needs the event port and a pointer to the event handler 53 0,
62 */ 54 (thrstartfunc)ev_handle_events,
63 ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t)); 55 handler);
64 if(conf == NULL) { 56 systhread_detach(t);
65 free(ev->ports);
66 free(ev);
67 return NULL;
68 }
69 conf->handler = ev;
70 conf->port = ev->ports[i];
71
72 systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf);
73 // TODO: error handling
74 } 57 }
75 58
76 return ev; 59 return ev;
77 } 60 }
78 61
79 void ev_handle_events(ev_thr_conf_t *conf) { 62 void ev_handle_events(EventHandler *ev) {
80 event_handler_t *ev = conf->handler; 63 port_event_t events[64];
81 int port = conf->port;
82
83 free(conf);
84
85 port_event_t events[16];
86 struct timespec timeout; 64 struct timespec timeout;
87 timeout.tv_nsec = 0; 65 timeout.tv_nsec = 0;
88 timeout.tv_sec = 600; 66 timeout.tv_sec = 600;
89 67
90 for(;;) { 68 for(;;) {
91 // wait for events 69 // wait for events
92 uint_t nev = 1; 70 uint_t nev = 1;
93 int ret = port_getn(port, events, 16, &nev, &timeout); 71 int ret = port_getn(ev->port, events, 64, &nev, &timeout);
94 if(ret == -1) { 72 if(ret == -1) {
95 // TODO: check for error 73 // TODO: check for error
96 perror("port_getn"); 74 perror("port_getn");
97 continue; 75 continue;
98 } 76 }
99 77
100 for(int i=0;i<nev;i++) { 78 for(int i=0;i<nev;i++) {
101 event_t *event = events[i].portev_user; 79 Event *event = events[i].portev_user;
102 if(event->fn) { 80 if(event->fn) {
103 int saved_ev = event->poll;
104 if(event->fn(ev, event)) { 81 if(event->fn(ev, event)) {
105 /* 82 /*
106 * on solaris we have to reassociate the fd after 83 * on solaris we have to reassociate the fd after
107 * each event 84 * each event
108 * we do this if the event function returns 1 85 * we do this if the event function returns 1
109 */ 86 */
110 87 if(port_associate(
111 if(event->poll != saved_ev) { 88 ev->port,
112 // event type changed 89 PORT_SOURCE_FD,
113 int ne = 0; 90 (uintptr_t)event->object,
114 if((event->poll & EVENT_POLLIN) == EVENT_POLLIN) { 91 ev_convert2sys_events(event->events),
115 ne |= POLLIN; 92 event))
116 } 93 {
117 if((event->poll & EVENT_POLLOUT) == EVENT_POLLOUT) {
118 ne |= POLLOUT;
119 }
120 }
121
122 if(ev_poll(ev, event)) {
123 perror("port_associate"); 94 perror("port_associate");
124 } 95 }
125 } else if(event->finish) { 96 } else if(event->finish) {
126 event->finish(ev, event); 97 event->finish(ev, event);
127 } 98 }
128 } 99 }
129 } 100 }
130 } 101 }
131 } 102 }
132 103
133 // returns a event handler port 104 int ev_convert2sys_events(int events) {
134 int ev_get_port(event_handler_t *h) { 105 int e = 0;
135 int nps = h->nports; 106 if((events & EVENT_POLLIN) == EVENT_POLLIN) {
136 if(nps == 1) { 107 e |= POLLIN;
137 return h->ports[0];
138 } 108 }
139 109 if((events & EVENT_POLLOUT) == EVENT_POLLOUT) {
140 int cp = h->lp % nps; 110 e |= POLLOUT;
141 atomic_inc_32(&h->lp); 111 }
142 112 return e;
143 return h->ports[cp];
144 } 113 }
145 114
146 int ev_pollin(event_handler_t *h, int fd, event_t *event) { 115
116 int ev_pollin(EventHandler *h, int fd, Event *event) {
147 event->object = (intptr_t)fd; 117 event->object = (intptr_t)fd;
148 event->events = POLLIN; 118 event->events = EVENT_POLLIN;
149 event->poll = EVENT_POLLIN;
150 return port_associate( 119 return port_associate(
151 ev_get_port(h), 120 h->port,
152 PORT_SOURCE_FD, 121 PORT_SOURCE_FD,
153 (uintptr_t)fd, 122 (uintptr_t)fd,
154 POLLIN, 123 POLLIN,
155 event); 124 event);
156 } 125 }
157 126
158 int ev_pollout(event_handler_t *h, int fd, event_t *event) { 127 int ev_pollout(EventHandler *h, int fd, Event *event) {
159 event->object = (intptr_t)fd; 128 event->object = (intptr_t)fd;
160 event->events = POLLOUT; 129 event->events = EVENT_POLLOUT;
161 event->poll = EVENT_POLLOUT;
162 return port_associate( 130 return port_associate(
163 ev_get_port(h), 131 h->port,
164 PORT_SOURCE_FD, 132 PORT_SOURCE_FD,
165 (uintptr_t)fd, 133 (uintptr_t)fd,
166 POLLOUT, 134 POLLOUT,
167 event); 135 event);
168 } 136 }
169 137
170 int ev_poll(event_handler_t *h, event_t *event) { 138 int evt_send(EventHandler *h, Event *event) {
171 return port_associate( 139 event->object = 0;
172 ev_get_port(h), 140 event->events = 0;
173 PORT_SOURCE_FD, 141 return port_send(h->port, 0, event);
174 event->object,
175 event->events,
176 event);
177 } 142 }
178
179 int evt_send(event_handler_t *h, event_t *event) {
180 event->object = 0;
181 return port_send(ev_get_port(h), 0, event);
182 }

mercurial