src/server/daemon/event_solaris.c

changeset 193
aa8393527b1e
parent 162
b169992137a8
parent 187
4384bfbb7e26
child 443
ef3c8a0e1fee
--- a/src/server/daemon/event_solaris.c	Thu Aug 31 16:29:49 2017 +0200
+++ b/src/server/daemon/event_solaris.c	Sat Jan 13 19:01:00 2018 +0100
@@ -30,59 +30,39 @@
 #include <stdlib.h>
 #include <atomic.h>
 
+#include "../util/io.h"
+
 #include "event_solaris.h"
 
-event_handler_t* evhandler_create(int numthreads) {
-    event_handler_t *ev = malloc(sizeof(event_handler_t));
-    if(ev == NULL) {
-        return NULL;
-    }
+EVHandler* evhandler_create(EventHandlerConfig *cfg) {
+    EVHandler *ev = malloc(sizeof(EVHandler));
+    ev->current = 0;
+    ev->instances = calloc(cfg->nthreads, sizeof(void*));
+    ev->numins = cfg->nthreads;
     
-    ev->ports = calloc(numthreads, sizeof(int));
-    if(ev->ports == NULL) {
-        free(ev);
-        return NULL;
-    }
-    ev->nports = numthreads;
-    ev->lp = 0;
-    
-    /* create ports event threads */
-    for(int i=0;i<numthreads;i++) {
-        /* create port */
-        ev->ports[i] = port_create();
-        if(ev->ports[i] == 0) {
-            free(ev->ports);
-            free(ev);
+    for(int i=0;i<cfg->nthreads;i++) {
+        EventHandler *handler = malloc(sizeof(EventHandler));
+        ev->instances[i] = handler;
+        
+        handler->port = port_create();
+        if(handler->port == 0) {
+            // TODO: error
             return NULL;
         }
         
-        /*
-         * start a new handler thread
-         * the thread needs the event port and a pointer to the event handler
-         */
-        ev_thr_conf_t *conf = malloc(sizeof(ev_thr_conf_t));
-        if(conf == NULL) {
-            free(ev->ports);
-            free(ev);
-            return NULL;
-        }
-        conf->handler = ev;
-        conf->port = ev->ports[i];
-        
-        systhread_start(0, 0, (thrstartfunc)ev_handle_events, conf);
-        // TODO: error handling
+        SYS_THREAD t = systhread_start(
+                0,
+                0,
+                (thrstartfunc)ev_handle_events,
+                handler);
+        systhread_detach(t);
     }
     
     return ev;
 }
 
-void ev_handle_events(ev_thr_conf_t *conf) {
-    event_handler_t *ev = conf->handler;
-    int port = conf->port;
-    
-    free(conf);
-    
-    port_event_t events[16];
+void ev_handle_events(EventHandler *ev) {   
+    port_event_t events[64];
     struct timespec timeout;
     timeout.tv_nsec = 0;
     timeout.tv_sec = 600;
@@ -90,7 +70,7 @@
     for(;;) {
         // wait for events
         uint_t nev = 1;
-        int ret = port_getn(port, events, 16, &nev, &timeout);
+        int ret = port_getn(ev->port, events, 64, &nev, &timeout);
         if(ret == -1) {
             // TODO: check for error
             perror("port_getn");
@@ -98,85 +78,141 @@
         }
         
         for(int i=0;i<nev;i++) {
-            event_t *event = events[i].portev_user;
-            if(event->fn) {
-                int saved_ev = event->poll;
-                if(event->fn(ev, event)) {
-                    /*
-                     * on solaris we have to reassociate the fd after
-                     * each event
-                     * we do this if the event function returns 1
-                     */
-                    
-                    if(event->poll != saved_ev) {
-                        // event type changed
-                        int ne = 0;
-                        if((event->poll & EVENT_POLLIN) == EVENT_POLLIN) {
-                            ne |= POLLIN;
-                        }
-                        if((event->poll & EVENT_POLLOUT) == EVENT_POLLOUT) {
-                            ne |= POLLOUT;
+            Event *event = events[i].portev_user;
+            if(events[i].portev_source == PORT_SOURCE_AIO) {
+                aiocb_t *aiocb = (aiocb_t*)events[i].portev_object;
+                if(event) {
+                    aiocb_s *aio = (aiocb_s*)event->object;
+                    aio->result = aiocb->aio_resultp.aio_return;
+                    aio->result_errno = aiocb->aio_resultp.aio_errno;
+                    if(event->fn) {
+                        if(!event->fn(ev, event) && event->finish) {
+                            event->finish(ev, event);
                         }
                     }
-                    
-                    if(ev_poll(ev, event)) {
-                        perror("port_associate");
-                    }                 
-                } else if(event->finish) {
-                    event->finish(ev, event);
                 }
-            }
+                free(aiocb);  
+            } else {
+                if(event->fn) {
+                    if(event->fn(ev, event)) {
+                        /*
+                         * on solaris we have to reassociate the fd after
+                         * each event
+                         * we do this if the event function returns 1
+                         */
+                        if(port_associate(
+                                ev->port,
+                                PORT_SOURCE_FD,
+                                (uintptr_t)event->object,
+                                ev_convert2sys_events(event->events),
+                                event))
+                        {
+                            perror("port_associate");
+                        }                 
+                    } else if(event->finish) {
+                        event->finish(ev, event);
+                    }
+                }
+            }  
         }
     }
 }
 
-// returns a event handler port
-int ev_get_port(event_handler_t *h) {
-    int nps = h->nports;
-    if(nps == 1) {
-        return h->ports[0];
+int ev_convert2sys_events(int events) {
+    int e = 0;
+    if((events & EVENT_POLLIN) == EVENT_POLLIN) {
+        e |= POLLIN;
     }
-    
-    int cp = h->lp % nps;
-    atomic_inc_32(&h->lp);
-    
-    return h->ports[cp];
+    if((events & EVENT_POLLOUT) == EVENT_POLLOUT) {
+        e |= POLLOUT;
+    }
+    return e;
 }
 
-int ev_pollin(event_handler_t *h, int fd, event_t *event) {
+
+int ev_pollin(EventHandler *h, int fd, Event *event) {
     event->object = (intptr_t)fd;
-    event->events = POLLIN;
-    event->poll = EVENT_POLLIN;
+    event->events = EVENT_POLLIN;
     return port_associate(
-            ev_get_port(h),
+            h->port,
             PORT_SOURCE_FD,
             (uintptr_t)fd,
             POLLIN,
             event);
 }
 
-int ev_pollout(event_handler_t *h, int fd, event_t *event) {
+int ev_pollout(EventHandler *h, int fd, Event *event) {
     event->object = (intptr_t)fd;
-    event->events = POLLOUT;
-    event->poll = EVENT_POLLOUT;
+    event->events = EVENT_POLLOUT;
     return port_associate(
-            ev_get_port(h),
+            h->port,
             PORT_SOURCE_FD,
             (uintptr_t)fd,
             POLLOUT,
             event);
 }
 
-int ev_poll(event_handler_t *h, event_t *event) {
-    return port_associate(
-            ev_get_port(h),
-            PORT_SOURCE_FD,
-            event->object,
-            event->events,
-            event);
+int ev_remove_poll(EventHandler *h, int fd) {
+    return port_dissociate(h->port, PORT_SOURCE_FD, (uintptr_t)fd);
+}
+
+int event_send(EventHandler *h, Event *event) {
+    event->object = 0;
+    event->events = 0;
+    return port_send(h->port, 0, event);
 }
 
-int evt_send(event_handler_t *h, event_t *event) {
-    event->object = 0;
-    return port_send(ev_get_port(h), 0, event);
+static int ev_aio(int fd, aiocb_s *cb, WSBool read) {
+    EventHandler *ev = cb->evhandler;
+    if(!ev) {
+        return -1;
+    }
+    
+    aiocb_t *aiocb = malloc(sizeof(aiocb_t));
+    if(!aiocb) {
+        return -1;
+    }
+    ZERO(aiocb, sizeof(aiocb_t));
+    
+    aiocb->aio_fildes = fd;
+    aiocb->aio_buf = cb->buf;
+    aiocb->aio_nbytes = cb->nbytes;
+    aiocb->aio_offset = cb->offset;
+    
+    port_notify_t *portnotify = malloc(sizeof(port_notify_t));
+    if(!portnotify) {
+        free(aiocb);
+        return -1;
+    }
+    portnotify->portnfy_port = ev->port;
+    portnotify->portnfy_user = cb->event;
+    aiocb->aio_sigevent.sigev_notify = SIGEV_PORT;
+    aiocb->aio_sigevent.sigev_value.sival_ptr = portnotify;
+    
+    if(read) {
+        return aio_read(aiocb);
+    } else {
+        return aio_write(aiocb);
+    }
 }
+
+int ev_aioread(int fd, aiocb_s *cb) {
+    return ev_aio(fd, cb, TRUE);
+}
+
+int ev_aiowrite(int fd, aiocb_s *cb) {
+    return ev_aio(fd, cb, FALSE);
+}
+
+
+int event_pollin(EventHandler *ev, SYS_NETFD fd, Event *event) {
+    return ((IOStream*)fd)->poll(fd, ev, IO_POLL_IN, event);
+}
+
+int event_pollout(EventHandler *ev, SYS_NETFD fd, Event *event) {
+    return ((IOStream*)fd)->poll(fd, ev, IO_POLL_OUT, event);
+}
+
+int event_removepoll(EventHandler *ev, SYS_NETFD fd) {
+    return ((IOStream*)fd)->poll(fd, ev, IO_POLL_NONE, NULL);
+}

mercurial