Mercurial > projects > dreactor
view asyncdreactor/core/AsyncVat.d @ 11:5836613d16ac
reorg! reorg!
author | rick@minifunk |
---|---|
date | Tue, 12 Aug 2008 16:59:56 -0400 |
parents | dreactor/core/AsyncVat.d@e75a2e506b1d |
children |
line wrap: on
line source
/******************************************************************************* copyright: Copyright (c) 2008 Rick Richardson. All rights reserved license: BSD style: $(LICENSE) version: Initial release v0.1 : May 2008 author: Rick Richardson *******************************************************************************/ module dreactor.core.Vat; import tango.io.selector.Selector; import tango.io.selector.model.ISelector; import tango.core.Exception; import tango.core.Thread; import tango.core.Atomic; import tango.util.collection.LinkSeq; import tango.util.log.Log; import dreactor.transport.AsyncSocketConduit; import dreactor.core.Dispatcher; import dreactor.util.ThreadSafeQueue; Logger log; enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; static char[] version_string = "Vat.d 0.1 2008-05-31"; class Vat { private Thread thread; bool running; Atomic!(int) pending; ThreadSafeQueue!(Dispatcher) freshList; ThreadSafeQueue!(Dispatcher) remList; public this() { freshList = new ThreadSafeQueue!(Dispatcher); remList = new ThreadSafeQueue!(Dispatcher); log = Log.lookup("dreactor.core.Vat"); } void run() { running = true; thread = new Thread(&eventLoop); thread.start(); } void exit() { running = false; } void wait() { thread.join(); } bool addConnection(Dispatcher handler) { log.trace("adding handler"); return freshList.push(handler); } bool remConnection(Dispatcher handler) { return remList.push(handler); } private void eventLoop() { auto selector = new Selector(); selector.open(); do { auto eventCount = selector.select(0.01); if (eventCount > 0) { // process events foreach (SelectionKey key; selector.selectedSet()) { if (key.isReadable()) { // incoming data log.trace("Read event fired"); auto conn = cast(Dispatcher) key.attachment; if ( Dispatcher.State.listening == conn.getState() ) conn.handleConnection(conn.transport, &addConnection); else processReturn(conn.handleIncoming(), selector, conn); } else if (key.isWritable()) { log.trace("Write event fired"); auto conn = cast(Dispatcher) key.attachment; processReturn(conn.handleOutgoing(), selector, conn); } else if (key.isHangup()) { log.trace("Hangup event fired"); auto conn = cast(Dispatcher) key.attachment; processReturn(conn.handleDisconnect(), selector, conn); } else if (key.isError() || key.isInvalidHandle()) { log.trace("Error event fired"); // error, close connection auto conn = cast(Dispatcher) key.attachment; conn.handleError(&remConnection); } } } else if (eventCount == 0) { /* can't think of anything useful to do here. */ } else { log.error("Selector.select returned {}", eventCount); } //add Conduits to listener freshList.processAll( (ref Dispatcher h) { selector.register(h.transport, h.events(), h); return 1; }); remList.processAll( (ref Dispatcher h) { selector.unregister(h.transport); return 1; }); } while (running) } void processReturn(int result, Selector s, Dispatcher h) { switch(result) { case CLOSE: s.unregister(h.transport); h.transport.detach(); break; case UNREGISTER: s.unregister(h.transport); break; case REMAIN: //this space intentially left blank break; case REGISTER: s.register(h.transport, h.events(), h); break; case REREGISTER: s.register(h.transport, h.events(), h); break; default: log.error("processReturn: unknown return value"); } } }