src/server/daemon/event_bsd.c

branch
aio
changeset 170
711d00eeed25
parent 133
87b405d61f64
child 187
4384bfbb7e26
equal deleted inserted replaced
165:6942a8c3e737 170:711d00eeed25
31 31
32 #include "../util/atomic.h" 32 #include "../util/atomic.h"
33 33
34 #include "event_bsd.h" 34 #include "event_bsd.h"
35 35
36 event_handler_t* evhandler_create(int numthreads) { 36 EVHandler* evhandler_create(EventHandlerConfig *cfg) {
37 event_handler_t *ev = malloc(sizeof(event_handler_t)); 37 EVHandler *ev = malloc(sizeof(EVHandler));
38 if(ev == NULL) { 38 ev->current = 0;
39 return NULL; 39 ev->instances = calloc(cfg->nthreads, sizeof(void*));
40 } 40 ev->numins = cfg->nthreads;
41 41
42 ev->ports = calloc(numthreads, sizeof(int)); 42 for(int i=0;i<cfg->nthreads;i++) {
43 if(ev->ports == NULL) { 43 EventHandler *handler = malloc(sizeof(EventHandler));
44 free(ev); 44 ev->instances[i] = handler;
45 return NULL; 45
46 } 46 handler->kqueue = kqueue();
47 ev->nports = numthreads; 47 if(handler->kqueue == 0) {
48 ev->lp = 0; 48 // TODO: error
49
50 /* create ports event threads */
51 for(int i=0;i<numthreads;i++) {
52 /* create port */
53 //ev->ports[i] = port_create();
54 ev->ports[i] = kqueue();
55 if(ev->ports[i] == 0) {
56 free(ev->ports);
57 free(ev);
58 return NULL; 49 return NULL;
59 } 50 }
60 51
61 /* 52 SYS_THREAD t = systhread_start(
62 * start a new handler thread 53 0,
63 * the thread needs the event port and a pointer to the event handler 54 0,
64 */ 55 (thrstartfunc)ev_handle_events,
65 ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t)); 56 handler);
66 if(conf == NULL) { 57 systhread_detach(t);
67 free(ev->ports);
68 free(ev);
69 return NULL;
70 }
71 conf->handler = ev;
72 conf->port = ev->ports[i];
73
74 systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf);
75 /* TODO: error handling */
76 } 58 }
77 59
78 return ev; 60 return ev;
79 } 61 }
80 62
81 void ev_handle_events(ev_thr_conf_t *conf) { 63
82 event_handler_t *ev = conf->handler; 64 void ev_handle_events(EventHandler *ev) {
83 int kq = conf->port;
84
85 free(conf);
86
87 struct timespec timeout; 65 struct timespec timeout;
88 timeout.tv_nsec = 0; 66 timeout.tv_nsec = 0;
89 timeout.tv_sec = 600; 67 timeout.tv_sec = 600;
90 68
91 struct kevent events[16]; 69 struct kevent events[64];
70 struct kevent changes[64];
71 int numchanges = 0;
92 72
93 for(;;) { 73 for(;;) {
94 // wait for events 74 // wait for events
95 int nev; 75 int nev = kevent(ev->kqueue, changes, numchanges, events, 64, &timeout);
96 nev = kevent(kq, NULL, 0, events, 16, &timeout);
97 if(nev == -1) { 76 if(nev == -1) {
98 // TODO: check for error 77 // TODO: check for error
99 perror("kevent"); 78 perror("kevent");
100 continue; 79 continue;
101 } 80 }
102 81
82 numchanges = 0;
103 for(int i=0;i<nev;i++) { 83 for(int i=0;i<nev;i++) {
104 event_t *event = (event_t*)events[i].udata; 84 Event *event = (Event*)events[i].udata;
105 if(event->fn) { 85 if(event->fn) {
106 int ep = event->poll; 86 int ep = event->events;
107 if(event->fn(ev, event)) { 87 if(event->fn(ev, event)) {
108 // TODO: reassociate? 88 if(event->events != ep) {
109 // TODO: check ep and event->poll 89 changes[numchanges++].filter = ev_convert2sys_events(ep);
90 }
110 } else if(event->finish) { 91 } else if(event->finish) {
92 changes[numchanges++].filter = ev_convert2sys_events(ep);
111 event->finish(ev, event); 93 event->finish(ev, event);
112 } 94 }
113 } 95 }
114 } 96 }
115 } 97 }
116 } 98 }
117 99
118 /* returns a event handler port */ 100 int ev_convert2sys_events(int events) {
119 int ev_get_port(event_handler_t *h) { 101 int e = 0;
120 int nps = h->nports; 102 if((events & EVENT_POLLIN) == EVENT_POLLIN) {
121 if(nps == 1) { 103 e |= EVFILT_READ;
122 return h->ports[0];
123 } 104 }
124 105 if((events & EVENT_POLLOUT) == EVENT_POLLOUT) {
125 int cp = h->lp % nps; 106 e |= EVFILT_WRITE;
126 ws_atomic_inc32(&h->lp); 107 }
127 108 return e;
128 return h->ports[cp];
129 } 109 }
130 110
131 int ev_pollin(event_handler_t *h, int fd, event_t *event) { 111 int ev_pollin(EventHandler *h, int fd, Event *event) {
132 event->poll = EVENT_POLLIN; 112 event->events = EVENT_POLLIN;
133 struct kevent kev; 113 struct kevent kev;
134 EV_SET(&kev, fd, EVFILT_READ, EV_ADD, 0, 0, event); 114 EV_SET(&kev, fd, EVFILT_READ, EV_ADD, 0, 0, event);
135 return kevent(ev_get_port(h), &kev, 1, NULL, 0, NULL); 115 return kevent(h->kqueue, &kev, 1, NULL, 0, NULL);
136 } 116 }
137 117
138 int ev_pollout(event_handler_t *h, int fd, event_t *event) { 118 int ev_pollout(EventHandler *h, int fd, Event *event) {
139 event->poll = EVENT_POLLOUT; 119 event->events = EVENT_POLLOUT;
140 struct kevent kev; 120 struct kevent kev;
141 EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD, 0, 0, event); 121 EV_SET(&kev, fd, EVFILT_WRITE, EV_ADD, 0, 0, event);
142 return kevent(ev_get_port(h), &kev, 1, NULL, 0, NULL); 122 return kevent(h->kqueue, &kev, 1, NULL, 0, NULL);
143 } 123 }
144 124
145 int evt_send(event_handler_t *h, event_t *event) { 125 int evt_send(EventHandler *h, Event *event) {
146 return 0; 126 return 0;
147 } 127 }

mercurial