31 |
31 |
32 #include "../util/atomic.h" |
32 #include "../util/atomic.h" |
33 |
33 |
34 #include "event_bsd.h" |
34 #include "event_bsd.h" |
35 |
35 |
36 event_handler_t* evhandler_create(int numthreads) { |
36 EVHandler* evhandler_create(EventHandlerConfig *cfg) { |
37 event_handler_t *ev = malloc(sizeof(event_handler_t)); |
37 EVHandler *ev = malloc(sizeof(EVHandler)); |
38 if(ev == NULL) { |
38 ev->current = 0; |
39 return NULL; |
39 ev->instances = calloc(cfg->nthreads, sizeof(void*)); |
40 } |
40 ev->numins = cfg->nthreads; |
41 |
41 |
42 ev->ports = calloc(numthreads, sizeof(int)); |
42 for(int i=0;i<cfg->nthreads;i++) { |
43 if(ev->ports == NULL) { |
43 EventHandler *handler = malloc(sizeof(EventHandler)); |
44 free(ev); |
44 ev->instances[i] = handler; |
45 return NULL; |
45 |
46 } |
46 handler->kqueue = kqueue(); |
47 ev->nports = numthreads; |
47 if(handler->kqueue == 0) { |
48 ev->lp = 0; |
48 // TODO: error |
49 |
|
50 /* create ports event threads */ |
|
51 for(int i=0;i<numthreads;i++) { |
|
52 /* create port */ |
|
53 //ev->ports[i] = port_create(); |
|
54 ev->ports[i] = kqueue(); |
|
55 if(ev->ports[i] == 0) { |
|
56 free(ev->ports); |
|
57 free(ev); |
|
58 return NULL; |
49 return NULL; |
59 } |
50 } |
60 |
51 |
61 /* |
52 SYS_THREAD t = systhread_start( |
62 * start a new handler thread |
53 0, |
63 * the thread needs the event port and a pointer to the event handler |
54 0, |
64 */ |
55 (thrstartfunc)ev_handle_events, |
65 ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t)); |
56 handler); |
66 if(conf == NULL) { |
57 systhread_detach(t); |
67 free(ev->ports); |
|
68 free(ev); |
|
69 return NULL; |
|
70 } |
|
71 conf->handler = ev; |
|
72 conf->port = ev->ports[i]; |
|
73 |
|
74 systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf); |
|
75 /* TODO: error handling */ |
|
76 } |
58 } |
77 |
59 |
78 return ev; |
60 return ev; |
79 } |
61 } |
80 |
62 |
81 void ev_handle_events(ev_thr_conf_t *conf) { |
63 |
82 event_handler_t *ev = conf->handler; |
64 void ev_handle_events(EventHandler *ev) { |
83 int kq = conf->port; |
|
84 |
|
85 free(conf); |
|
86 |
|
87 struct timespec timeout; |
65 struct timespec timeout; |
88 timeout.tv_nsec = 0; |
66 timeout.tv_nsec = 0; |
89 timeout.tv_sec = 600; |
67 timeout.tv_sec = 600; |
90 |
68 |
91 struct kevent events[16]; |
69 struct kevent events[64]; |
|
70 struct kevent changes[64]; |
|
71 int numchanges = 0; |
92 |
72 |
93 for(;;) { |
73 for(;;) { |
94 // wait for events |
74 // wait for events |
95 int nev; |
75 int nev = kevent(ev->kqueue, changes, numchanges, events, 64, &timeout); |
96 nev = kevent(kq, NULL, 0, events, 16, &timeout); |
|
97 if(nev == -1) { |
76 if(nev == -1) { |
98 // TODO: check for error |
77 // TODO: check for error |
99 perror("kevent"); |
78 perror("kevent"); |
100 continue; |
79 continue; |
101 } |
80 } |
102 |
81 |
|
82 numchanges = 0; |
103 for(int i=0;i<nev;i++) { |
83 for(int i=0;i<nev;i++) { |
104 event_t *event = (event_t*)events[i].udata; |
84 Event *event = (Event*)events[i].udata; |
105 if(event->fn) { |
85 if(event->fn) { |
106 int ep = event->poll; |
86 int ep = event->events; |
107 if(event->fn(ev, event)) { |
87 if(event->fn(ev, event)) { |
108 // TODO: reassociate? |
88 if(event->events != ep) { |
109 // TODO: check ep and event->poll |
89 changes[numchanges++].filter = ev_convert2sys_events(ep); |
|
90 } |
110 } else if(event->finish) { |
91 } else if(event->finish) { |
|
92 changes[numchanges++].filter = ev_convert2sys_events(ep); |
111 event->finish(ev, event); |
93 event->finish(ev, event); |
112 } |
94 } |
113 } |
95 } |
114 } |
96 } |
115 } |
97 } |
116 } |
98 } |
117 |
99 |
118 /* returns a event handler port */ |
100 int ev_convert2sys_events(int events) { |
119 int ev_get_port(event_handler_t *h) { |
101 int e = 0; |
120 int nps = h->nports; |
102 if((events & EVENT_POLLIN) == EVENT_POLLIN) { |
121 if(nps == 1) { |
103 e |= EVFILT_READ; |
122 return h->ports[0]; |
|
123 } |
104 } |
124 |
105 if((events & EVENT_POLLOUT) == EVENT_POLLOUT) { |
125 int cp = h->lp % nps; |
106 e |= EVFILT_WRITE; |
126 ws_atomic_inc32(&h->lp); |
107 } |
127 |
108 return e; |
128 return h->ports[cp]; |
|
129 } |
109 } |
130 |
110 |
131 int ev_pollin(event_handler_t *h, int fd, event_t *event) { |
111 int ev_pollin(EventHandler *h, int fd, Event *event) { |
132 event->poll = EVENT_POLLIN; |
112 event->events = EVENT_POLLIN; |
133 struct kevent kev; |
113 struct kevent kev; |
134 EV_SET(&kev, fd, EVFILT_READ, EV_ADD, 0, 0, event); |
114 EV_SET(&kev, fd, EVFILT_READ, EV_ADD, 0, 0, event); |
135 return kevent(ev_get_port(h), &kev, 1, NULL, 0, NULL); |
115 return kevent(h->kqueue, &kev, 1, NULL, 0, NULL); |
136 } |
116 } |
137 |
117 |
138 int ev_pollout(event_handler_t *h, int fd, event_t *event) { |
118 int ev_pollout(EventHandler *h, int fd, Event *event) { |
139 event->poll = EVENT_POLLOUT; |
119 event->events = EVENT_POLLOUT; |
140 struct kevent kev; |
120 struct kevent kev; |
141 EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD, 0, 0, event); |
121 EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD, 0, 0, event); |
142 return kevent(ev_get_port(h), &kev, 1, NULL, 0, NULL); |
122 return kevent(h->kqueue, &kev, 1, NULL, 0, NULL); |
143 } |
123 } |
144 |
124 |
145 int evt_send(event_handler_t *h, event_t *event) { |
125 int event_send(EventHandler *h, Event *event) { |
146 return 0; |
126 return 0; |
147 } |
127 } |