Mercurial > projects > dreactor
view dreactor/core/SelectLoop.d @ 3:e3dbc9208822
basic tests working
author | rick@minifunk |
---|---|
date | Tue, 08 Jul 2008 11:21:09 -0400 |
parents | d3374d553986 |
children |
line wrap: on
line source
/******************************************************************************* copyright: Copyright (c) 2008 Rick Richardson. All rights reserved license: BSD style: $(LICENSE) version: Initial release v0.1 : May 2008 author: Rick Richardson *******************************************************************************/ module dreactor.core.SelectLoop; import tango.io.selector.Selector; import tango.io.selector.model.ISelector; import tango.core.Exception; import tango.core.Thread; import tango.core.Atomic; import tango.util.collection.LinkSeq; import tango.util.log.Log; import dreactor.transport.AsyncSocketConduit; import dreactor.core.ConnectionHandler; import dreactor.util.ThreadSafeQueue; Logger log; enum : int {UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; static char[] version_string = "SelectLoop.d 0.1 2008-05-31"; class SelectLoop { private Thread thread; bool running; Atomic!(int) pending; ThreadSafeQueue!(ConnectionHandler) freshList; ThreadSafeQueue!(ConnectionHandler) remList; public this() { freshList = new ThreadSafeQueue!(ConnectionHandler); remList = new ThreadSafeQueue!(ConnectionHandler); log = Log.lookup("dreactor.core.SelectLoop"); } void run() { running = true; thread = new Thread(&eventLoop); thread.start(); } void exit() { running = false; } bool addConnection(ConnectionHandler handler) { log.trace("adding handler"); return freshList.push(handler); } bool remConnection(ConnectionHandler handler) { return remList.push(handler); } private void eventLoop() { auto selector = new Selector(); selector.open(); do { auto eventCount = selector.select(0.01); if (eventCount > 0) { // process events foreach (SelectionKey key; selector.selectedSet()) { if (key.isReadable()) { // incoming data log.trace("Read event fired"); auto conn = cast(ConnectionHandler) key.attachment; if ( ConnectionHandler.State.listening == conn.getState() ) conn.handleConnection(conn.transport, &addConnection); else processReturn(conn.handleIncoming(), selector, conn); } else if (key.isWritable()) { log.trace("Write event fired"); auto conn = cast(ConnectionHandler) key.attachment; processReturn(conn.handleOutgoing(), selector, conn); } else if (key.isHangup()) { auto conn = cast(ConnectionHandler) key.attachment; processReturn(conn.handleDisconnect(), selector, conn); } else if (key.isError() || key.isInvalidHandle()) { log.trace("Error event fired"); // error, close connection auto conn = cast(ConnectionHandler) key.attachment; conn.handleError(&remConnection); } } } else if (eventCount == 0) { /* can't think of anything useful to do here. */ } else { log.error("Selector.select returned {}", eventCount); } //add Conduits to listener freshList.processAll( (ref ConnectionHandler h) { log.trace("reregistering transport for event {}", h.events()); selector.reregister(h.transport, h.events(), h); return 1; }); remList.processAll( (ref ConnectionHandler h) { selector.unregister(h.transport); return 1; }); } while (running) log.trace("done with while loop"); } void processReturn(int result, Selector s, ConnectionHandler h) { switch(result) { case UNREGISTER: s.unregister(h.transport); break; case REMAIN: //this space intentially left blank break; case REGISTER: s.register(h.transport, h.events(), h); break; case REREGISTER: s.reregister(h.transport, h.events(), h); break; default: log.error("unknown return value"); } } }