Mercurial > projects > dreactor
changeset 10:e75a2e506b1d
housekeeping
author | rick@minifunk |
---|---|
date | Fri, 01 Aug 2008 16:30:45 -0400 |
parents | 5412a1ff2e49 |
children | 5836613d16ac |
files | dreactor/core/AsyncVat.d dreactor/core/Vat.d dreactor/protocol/Http11.d dreactor/protocol/RawTcp.d test/chatserver.d test/test |
diffstat | 6 files changed, 324 insertions(+), 268 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/core/AsyncVat.d Fri Aug 01 16:30:45 2008 -0400 @@ -0,0 +1,171 @@ +/******************************************************************************* + + copyright: Copyright (c) 2008 Rick Richardson. All rights reserved + + license: BSD style: $(LICENSE) + + version: Initial release v0.1 : May 2008 + + author: Rick Richardson + +*******************************************************************************/ + +module dreactor.core.Vat; + +import tango.io.selector.Selector; +import tango.io.selector.model.ISelector; +import tango.core.Exception; +import tango.core.Thread; +import tango.core.Atomic; +import tango.util.collection.LinkSeq; +import tango.util.log.Log; + +import dreactor.transport.AsyncSocketConduit; +import dreactor.core.Dispatcher; +import dreactor.util.ThreadSafeQueue; + +Logger log; + +enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; + +static char[] version_string = "Vat.d 0.1 2008-05-31"; + +class Vat +{ +private + Thread thread; + bool running; + Atomic!(int) pending; + + ThreadSafeQueue!(Dispatcher) freshList; + ThreadSafeQueue!(Dispatcher) remList; +public + this() + { + freshList = new ThreadSafeQueue!(Dispatcher); + remList = new ThreadSafeQueue!(Dispatcher); + log = Log.lookup("dreactor.core.Vat"); + } + + void run() + { + running = true; + thread = new Thread(&eventLoop); + thread.start(); + } + + void exit() + { + running = false; + } + + void wait() + { + thread.join(); + } + + bool addConnection(Dispatcher handler) + { + log.trace("adding handler"); + return freshList.push(handler); + } + + bool remConnection(Dispatcher handler) + { + return remList.push(handler); + } + +private + void eventLoop() + { + auto selector = new Selector(); + selector.open(); + do + { + auto eventCount = selector.select(0.01); + + if (eventCount > 0) + { + // process events + foreach (SelectionKey key; selector.selectedSet()) + { + if (key.isReadable()) + { + // incoming data + log.trace("Read event fired"); + auto conn = cast(Dispatcher) key.attachment; + if ( Dispatcher.State.listening == conn.getState() ) + conn.handleConnection(conn.transport, &addConnection); + else + processReturn(conn.handleIncoming(), selector, conn); + } + else if (key.isWritable()) + { + log.trace("Write event fired"); + auto conn = cast(Dispatcher) key.attachment; + processReturn(conn.handleOutgoing(), selector, conn); + } + else if (key.isHangup()) + { + log.trace("Hangup event fired"); + auto conn = cast(Dispatcher) key.attachment; + processReturn(conn.handleDisconnect(), selector, conn); + } + else if (key.isError() || key.isInvalidHandle()) + { + log.trace("Error event fired"); + // error, close connection + auto conn = cast(Dispatcher) key.attachment; + conn.handleError(&remConnection); + } + } + } + else if (eventCount == 0) + { + /* can't think of anything useful to do here. */ + } + else + { + log.error("Selector.select returned {}", eventCount); + } + //add Conduits to listener + freshList.processAll( (ref Dispatcher h) + { + selector.register(h.transport, h.events(), h); + return 1; + }); + remList.processAll( (ref Dispatcher h) + { + selector.unregister(h.transport); + return 1; + }); + + } while (running) + + } + + void processReturn(int result, Selector s, Dispatcher h) + { + switch(result) + { + case CLOSE: + s.unregister(h.transport); + h.transport.detach(); + break; + case UNREGISTER: + s.unregister(h.transport); + break; + case REMAIN: + //this space intentially left blank + break; + case REGISTER: + s.register(h.transport, h.events(), h); + break; + case REREGISTER: + s.register(h.transport, h.events(), h); + break; + default: + log.error("processReturn: unknown return value"); + } + } +}
--- a/dreactor/core/Vat.d Sat Jul 12 10:42:41 2008 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,171 +0,0 @@ -/******************************************************************************* - - copyright: Copyright (c) 2008 Rick Richardson. All rights reserved - - license: BSD style: $(LICENSE) - - version: Initial release v0.1 : May 2008 - - author: Rick Richardson - -*******************************************************************************/ - -module dreactor.core.Vat; - -import tango.io.selector.Selector; -import tango.io.selector.model.ISelector; -import tango.core.Exception; -import tango.core.Thread; -import tango.core.Atomic; -import tango.util.collection.LinkSeq; -import tango.util.log.Log; - -import dreactor.transport.AsyncSocketConduit; -import dreactor.core.Dispatcher; -import dreactor.util.ThreadSafeQueue; - -Logger log; - -enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; - -static char[] version_string = "Vat.d 0.1 2008-05-31"; - -class Vat -{ -private - Thread thread; - bool running; - Atomic!(int) pending; - - ThreadSafeQueue!(Dispatcher) freshList; - ThreadSafeQueue!(Dispatcher) remList; -public - this() - { - freshList = new ThreadSafeQueue!(Dispatcher); - remList = new ThreadSafeQueue!(Dispatcher); - log = Log.lookup("dreactor.core.Vat"); - } - - void run() - { - running = true; - thread = new Thread(&eventLoop); - thread.start(); - } - - void exit() - { - running = false; - } - - void wait() - { - thread.join(); - } - - bool addConnection(Dispatcher handler) - { - log.trace("adding handler"); - return freshList.push(handler); - } - - bool remConnection(Dispatcher handler) - { - return remList.push(handler); - } - -private - void eventLoop() - { - auto selector = new Selector(); - selector.open(); - do - { - auto eventCount = selector.select(0.01); - - if (eventCount > 0) - { - // process events - foreach (SelectionKey key; selector.selectedSet()) - { - if (key.isReadable()) - { - // incoming data - log.trace("Read event fired"); - auto conn = cast(Dispatcher) key.attachment; - if ( Dispatcher.State.listening == conn.getState() ) - conn.handleConnection(conn.transport, &addConnection); - else - processReturn(conn.handleIncoming(), selector, conn); - } - else if (key.isWritable()) - { - log.trace("Write event fired"); - auto conn = cast(Dispatcher) key.attachment; - processReturn(conn.handleOutgoing(), selector, conn); - } - else if (key.isHangup()) - { - log.trace("Hangup event fired"); - auto conn = cast(Dispatcher) key.attachment; - processReturn(conn.handleDisconnect(), selector, conn); - } - else if (key.isError() || key.isInvalidHandle()) - { - log.trace("Error event fired"); - // error, close connection - auto conn = cast(Dispatcher) key.attachment; - conn.handleError(&remConnection); - } - } - } - else if (eventCount == 0) - { - /* can't think of anything useful to do here. */ - } - else - { - log.error("Selector.select returned {}", eventCount); - } - //add Conduits to listener - freshList.processAll( (ref Dispatcher h) - { - selector.register(h.transport, h.events(), h); - return 1; - }); - remList.processAll( (ref Dispatcher h) - { - selector.unregister(h.transport); - return 1; - }); - - } while (running) - - } - - void processReturn(int result, Selector s, Dispatcher h) - { - switch(result) - { - case CLOSE: - s.unregister(h.transport); - h.transport.detach(); - break; - case UNREGISTER: - s.unregister(h.transport); - break; - case REMAIN: - //this space intentially left blank - break; - case REGISTER: - s.register(h.transport, h.events(), h); - break; - case REREGISTER: - s.register(h.transport, h.events(), h); - break; - default: - log.error("processReturn: unknown return value"); - } - } -}
--- a/dreactor/protocol/Http11.d Sat Jul 12 10:42:41 2008 -0400 +++ b/dreactor/protocol/Http11.d Fri Aug 01 16:30:45 2008 -0400 @@ -1,5 +1,43 @@ -module dreactor.protocol.Http11.d; +module dreactor.protocol.Http11; + +import dreactor.protocol.RawTcp; +import dreactor.protocol.http11_parser; + +class HttpListener +{ +public + this(Vat sel, IPv4Address addr) + { + listener = new RawTCPListener(sel, IPv4Address addr); + parser = new Http11Parser(); + listener.setDataHandler(&onData); + } + + private int onData(char[] buffer) + { + parser.execute(buffer); + } -class Http + +private + RawTCPListener listener; + Http11Parser parser; +} + +class HttpClient { +public + this(Vat sel) + { + client = new RawTCPClient(sel); + client.setDataHandler(&onData); + } + + private int onData() + { + + } +private + RawTCPClient client; } +
--- a/dreactor/protocol/RawTcp.d Sat Jul 12 10:42:41 2008 -0400 +++ b/dreactor/protocol/RawTcp.d Fri Aug 01 16:30:45 2008 -0400 @@ -16,19 +16,18 @@ Basic TCP server or client routines for sending raw data. ******************************************************************************/ -class RawTCPListener : RawTCPHandler +class RawTCPListener { public - Logger log; this(Dispatcher mgr, Vat sel, IPv4Address addr) { manager = mgr; mgr.events(Event.Read); - mgr.setOutgoingHandler(&onSend); + mgr.setOutgoingHandler(&RawTCPHandler.onSend); mgr.setIncomingHandler(&onReceive); mgr.setConnectHandler(&accept); - mgr.setErrorHandler(&onError); - mgr.setDisconnectHandler(&onHangup); + mgr.setErrorHandler(&RawTCPHandler.onError); + mgr.setDisconnectHandler(&RawTCPHandler.onHangup); mgr.listen(addr); sel.addConnection(mgr); @@ -47,13 +46,7 @@ ~this() { - foreach(Dispatcher d; children) - { - (cast(AsyncSocketConduit)d.transport).shutdown(); - (cast(AsyncSocketConduit)d.transport).detach(); - } - (cast(AsyncSocketConduit)manager.transport).shutdown(); - (cast(AsyncSocketConduit)manager.transport).detach(); + close(); } int accept(Conduit cond, RegisterD reg) @@ -96,7 +89,6 @@ if (d.appendOutBuffer(outbuf)) { d.addEvent(Event.Write); - d.setOutgoingHandler(&onSend); if (!vat.addConnection(d)) { log.error("unable to register mgr"); @@ -105,7 +97,14 @@ return 0; } - public int onReceive(Dispatcher h) + /************************************************************************** + + receive + IncomingHandlerD + Default incoming data handler. Should be replaced with something useful. + + **************************************************************************/ + int onReceive(Dispatcher h) { Logger log = Log.lookup("Handlers.onReceive"); @@ -133,28 +132,43 @@ void close() { + foreach(Dispatcher d; children) + { + (cast(AsyncSocketConduit)d.transport).shutdown(); + (cast(AsyncSocketConduit)d.transport).detach(); + } + (cast(AsyncSocketConduit)manager.transport).shutdown(); + (cast(AsyncSocketConduit)manager.transport).detach(); } + + void setDataHandler(void delegate(char[], Dispatcher) h) + { + dataHandler = h; + } private - Dispatcher manager; Vat vat; CircularSeq!(Dispatcher) children; + Dispatcher manager; + Logger log; + RawTCPHandler h; + void delegate(char[], Dispatcher) dataHandler; } -class RawTCPClient : RawTCPHandler +class RawTCPClient { + public - Logger log; this(Dispatcher mgr, Vat sel, Event evts = Event.Read) { manager = mgr; manager.events(evts); connected = false; - mgr.setOutgoingHandler(&onSend); + mgr.setOutgoingHandler(&RawTCPHandler.onSend); mgr.setIncomingHandler(&onReceive); - mgr.setErrorHandler(&onError); - mgr.setDisconnectHandler(&onHangup); + mgr.setErrorHandler(&RawTCPHandler.onError); + mgr.setDisconnectHandler(&RawTCPHandler.onHangup); vat = sel; log = Log.lookup("dreactor.protocol.RawTcpClient"); } @@ -214,10 +228,48 @@ return 0; } + /************************************************************************** + + receive + IncomingHandlerD + Default incoming data handler. Should be replaced with something useful. + + **************************************************************************/ + public int onReceive(Dispatcher h) + { + Logger log = Log.lookup("Handlers.onReceive"); + + char inbuf[8192]; + int amt; + if((amt = h.transport.read(inbuf)) > 0) + { + if (dataHandler) + dataHandler(inbuf[0 .. amt], h); + else + log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); + } + else + { + if (amt == 0) + { + return CLOSE; + } + log.error("Received no data, err = {}", amt); + } + return REMAIN; + } + + void setDataHandler(void delegate(char[], Dispatcher) h) + { + dataHandler = h; + } private + void delegate(char[], Dispatcher) dataHandler; Dispatcher manager; Vat vat; bool connected; + Logger log; + RawTCPHandler h; } @@ -226,94 +278,60 @@ Default Event handlers common to both listener/clients ******************************************************************************/ -class RawTCPHandler -{ -/************************************************************************** - - onSend - OutgoingHandlerD - To be registered as the response to socket writable event. - Sends data, returns amount sent. Unregisters Handler for sending - if there is no more data left to send. - -***************************************************************************/ -public int onSend(Dispatcher h) +struct RawTCPHandler { - Logger log = Log.lookup("Handlers.onSend"); - - char[] outbuf = h.nextBuffer(); - if (outbuf !is null) + /************************************************************************** + + onSend + OutgoingHandlerD + To be registered as the response to socket writable event. + Sends data, returns amount sent. Unregisters Handler for sending + if there is no more data left to send. + + ***************************************************************************/ + public static int onSend(Dispatcher h) { - int sent = h.transport.write(outbuf); - if (sent > 0) + Logger log = Log.lookup("Handlers.onSend"); + + char[] outbuf = h.nextBuffer(); + if (outbuf !is null) { - if (! h.addOffset(sent)) + int sent = h.transport.write(outbuf); + if (sent > 0) { - h.remEvent(Event.Write); - return REREGISTER; + if (! h.addOffset(sent)) + { + h.remEvent(Event.Write); + return REREGISTER; + } } - } - else if (sent == AsyncSocketConduit.Eof) - { - log.error("Select said socket was writable, but sent 0 bytes"); + else if (sent == AsyncSocketConduit.Eof) + { + log.error("Select said socket was writable, but sent 0 bytes"); + } + else + { + log.error("Socket send return ERR"); + } + return REMAIN; } else { - log.error("Socket send return ERR"); + h.remEvent(Event.Write); + return REREGISTER; } - return REMAIN; } - else - { - h.remEvent(Event.Write); - return REREGISTER; - } -} -/************************************************************************** - receive - IncomingHandlerD - Default incoming data handler. Should be replaced with something useful. - -**************************************************************************/ -public int onReceive(Dispatcher h) -{ - Logger log = Log.lookup("Handlers.onReceive"); - - char inbuf[8192]; - int amt; - if((amt = h.transport.read(inbuf)) > 0) + static int onHangup(Dispatcher d) { - if (dataHandler) - dataHandler(inbuf[0 .. amt], h); - else - log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); - } - else - { - if (amt == 0) - { - return CLOSE; - } - log.error("Received no data, err = {}", amt); + return UNREGISTER; } - return REMAIN; -} -int onHangup(Dispatcher d) -{ - return UNREGISTER; -} -int onError(Dispatcher d, RegisterD unreg) -{ - return CLOSE; -} -void setDataHandler(void delegate(char[],Dispatcher) del) -{ - dataHandler = del; -} -protected - void delegate(char[], Dispatcher) dataHandler; + static int onError(Dispatcher d, RegisterD unreg) + { + return CLOSE; + } + } bool includes(Dispatcher[] haystack, Dispatcher needle)
--- a/test/chatserver.d Sat Jul 12 10:42:41 2008 -0400 +++ b/test/chatserver.d Fri Aug 01 16:30:45 2008 -0400 @@ -10,13 +10,13 @@ import dreactor.protocol.RawTcp; import dreactor.transport.AsyncSocketConduit; -int count; int main() { Vat l_vat = new Vat(); Logger log = Log.lookup("dreactor.chatserver"); Log.root.level(log.Level.Info, true); RawTCPListener listener = new RawTCPListener(l_vat, new IPv4Address(5555)); + listener.setDataHandler( (char[] inbuf, Dispatcher d) { listener.broadcast(inbuf, [d]);