Mercurial > projects > dreactor
diff dreactor/core/Vat.d @ 5:f875a1f278b8
housekeeping
author | rick@minifunk |
---|---|
date | Tue, 08 Jul 2008 12:16:07 -0400 |
parents | dreactor/core/SelectLoop.d@e3dbc9208822 |
children | 287ba7de97c4 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/core/Vat.d Tue Jul 08 12:16:07 2008 -0400 @@ -0,0 +1,162 @@ +/******************************************************************************* + + copyright: Copyright (c) 2008 Rick Richardson. All rights reserved + + license: BSD style: $(LICENSE) + + version: Initial release v0.1 : May 2008 + + author: Rick Richardson + +*******************************************************************************/ + +module dreactor.core.Vat; + +import tango.io.selector.Selector; +import tango.io.selector.model.ISelector; +import tango.core.Exception; +import tango.core.Thread; +import tango.core.Atomic; +import tango.util.collection.LinkSeq; +import tango.util.log.Log; + +import dreactor.transport.AsyncSocketConduit; +import dreactor.core.ConnectionHandler; +import dreactor.util.ThreadSafeQueue; + +Logger log; + +enum : int {UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; + +static char[] version_string = "Vat.d 0.1 2008-05-31"; + +class Vat +{ +private + Thread thread; + bool running; + Atomic!(int) pending; + + ThreadSafeQueue!(ConnectionHandler) freshList; + ThreadSafeQueue!(ConnectionHandler) remList; +public + this() + { + freshList = new ThreadSafeQueue!(ConnectionHandler); + remList = new ThreadSafeQueue!(ConnectionHandler); + log = Log.lookup("dreactor.core.Vat"); + } + + void run() + { + running = true; + thread = new Thread(&eventLoop); + thread.start(); + } + + void exit() + { + running = false; + } + + bool addConnection(ConnectionHandler handler) + { + log.trace("adding handler"); + return freshList.push(handler); + } + + bool remConnection(ConnectionHandler handler) + { + return remList.push(handler); + } + +private + void eventLoop() + { + auto selector = new Selector(); + selector.open(); + do + { + auto eventCount = selector.select(0.01); + + if (eventCount > 0) + { + // process events + foreach (SelectionKey key; selector.selectedSet()) + { + if (key.isReadable()) + { + // incoming data + log.trace("Read event fired"); + auto conn = cast(ConnectionHandler) key.attachment; + if ( ConnectionHandler.State.listening == conn.getState() ) + conn.handleConnection(conn.transport, &addConnection); + else + processReturn(conn.handleIncoming(), selector, conn); + } + else if (key.isWritable()) + { + log.trace("Write event fired"); + auto conn = cast(ConnectionHandler) key.attachment; + processReturn(conn.handleOutgoing(), selector, conn); + } + else if (key.isHangup()) + { + log.trace("Hangup event fired"); + auto conn = cast(ConnectionHandler) key.attachment; + processReturn(conn.handleDisconnect(), selector, conn); + } + else if (key.isError() || key.isInvalidHandle()) + { + log.trace("Error event fired"); + // error, close connection + auto conn = cast(ConnectionHandler) key.attachment; + conn.handleError(&remConnection); + } + } + } + else if (eventCount == 0) + { + /* can't think of anything useful to do here. */ + } + else + { + log.error("Selector.select returned {}", eventCount); + } + //add Conduits to listener + freshList.processAll( (ref ConnectionHandler h) + { + selector.reregister(h.transport, h.events(), h); + return 1; + }); + remList.processAll( (ref ConnectionHandler h) + { + selector.unregister(h.transport); + return 1; + }); + + } while (running) + + } + + void processReturn(int result, Selector s, ConnectionHandler h) + { + switch(result) + { + case UNREGISTER: + s.unregister(h.transport); + break; + case REMAIN: + //this space intentially left blank + break; + case REGISTER: + s.register(h.transport, h.events(), h); + break; + case REREGISTER: + s.reregister(h.transport, h.events(), h); + break; + default: + log.error("processReturn: unknown return value"); + } + } +}