Mercurial > projects > dreactor
changeset 5:f875a1f278b8
housekeeping
author | rick@minifunk |
---|---|
date | Tue, 08 Jul 2008 12:16:07 -0400 |
parents | f8b01c9f7114 |
children | 287ba7de97c4 |
files | dreactor/core/ConnectionHandler.d dreactor/core/SelectLoop.d dreactor/core/Vat.d dreactor/protocol/Raw.d dreactor/protocol/RawTcp.d dsss.conf test/test test/test.d |
diffstat | 8 files changed, 185 insertions(+), 313 deletions(-) [+] |
line wrap: on
line diff
--- a/dreactor/core/ConnectionHandler.d Tue Jul 08 11:22:39 2008 -0400 +++ b/dreactor/core/ConnectionHandler.d Tue Jul 08 12:16:07 2008 -0400 @@ -274,9 +274,7 @@ } void addEvent(Event e) { - log.trace("events_ before: {}", events_); events_ |= e; - log.trace("events_ after: {}", events_); } void remEvent(Event e) {
--- a/dreactor/core/SelectLoop.d Tue Jul 08 11:22:39 2008 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,163 +0,0 @@ -/******************************************************************************* - - copyright: Copyright (c) 2008 Rick Richardson. All rights reserved - - license: BSD style: $(LICENSE) - - version: Initial release v0.1 : May 2008 - - author: Rick Richardson - -*******************************************************************************/ - -module dreactor.core.SelectLoop; - -import tango.io.selector.Selector; -import tango.io.selector.model.ISelector; -import tango.core.Exception; -import tango.core.Thread; -import tango.core.Atomic; -import tango.util.collection.LinkSeq; -import tango.util.log.Log; - -import dreactor.transport.AsyncSocketConduit; -import dreactor.core.ConnectionHandler; -import dreactor.util.ThreadSafeQueue; - -Logger log; - -enum : int {UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; - -static char[] version_string = "SelectLoop.d 0.1 2008-05-31"; - -class SelectLoop -{ -private - Thread thread; - bool running; - Atomic!(int) pending; - - ThreadSafeQueue!(ConnectionHandler) freshList; - ThreadSafeQueue!(ConnectionHandler) remList; -public - this() - { - freshList = new ThreadSafeQueue!(ConnectionHandler); - remList = new ThreadSafeQueue!(ConnectionHandler); - log = Log.lookup("dreactor.core.SelectLoop"); - } - - void run() - { - running = true; - thread = new Thread(&eventLoop); - thread.start(); - } - - void exit() - { - running = false; - } - - bool addConnection(ConnectionHandler handler) - { - log.trace("adding handler"); - return freshList.push(handler); - } - - bool remConnection(ConnectionHandler handler) - { - return remList.push(handler); - } - -private - void eventLoop() - { - auto selector = new Selector(); - selector.open(); - do - { - auto eventCount = selector.select(0.01); - - if (eventCount > 0) - { - // process events - foreach (SelectionKey key; selector.selectedSet()) - { - if (key.isReadable()) - { - // incoming data - log.trace("Read event fired"); - auto conn = cast(ConnectionHandler) key.attachment; - if ( ConnectionHandler.State.listening == conn.getState() ) - conn.handleConnection(conn.transport, &addConnection); - else - processReturn(conn.handleIncoming(), selector, conn); - } - else if (key.isWritable()) - { - log.trace("Write event fired"); - auto conn = cast(ConnectionHandler) key.attachment; - processReturn(conn.handleOutgoing(), selector, conn); - } - else if (key.isHangup()) - { - auto conn = cast(ConnectionHandler) key.attachment; - processReturn(conn.handleDisconnect(), selector, conn); - } - else if (key.isError() || key.isInvalidHandle()) - { - log.trace("Error event fired"); - // error, close connection - auto conn = cast(ConnectionHandler) key.attachment; - conn.handleError(&remConnection); - } - } - } - else if (eventCount == 0) - { - /* can't think of anything useful to do here. */ - } - else - { - log.error("Selector.select returned {}", eventCount); - } - //add Conduits to listener - freshList.processAll( (ref ConnectionHandler h) - { - log.trace("reregistering transport for event {}", h.events()); - selector.reregister(h.transport, h.events(), h); - return 1; - }); - remList.processAll( (ref ConnectionHandler h) - { - selector.unregister(h.transport); - return 1; - }); - - } while (running) - - log.trace("done with while loop"); - } - - void processReturn(int result, Selector s, ConnectionHandler h) - { - switch(result) - { - case UNREGISTER: - s.unregister(h.transport); - break; - case REMAIN: - //this space intentially left blank - break; - case REGISTER: - s.register(h.transport, h.events(), h); - break; - case REREGISTER: - s.reregister(h.transport, h.events(), h); - break; - default: - log.error("unknown return value"); - } - } -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/core/Vat.d Tue Jul 08 12:16:07 2008 -0400 @@ -0,0 +1,162 @@ +/******************************************************************************* + + copyright: Copyright (c) 2008 Rick Richardson. All rights reserved + + license: BSD style: $(LICENSE) + + version: Initial release v0.1 : May 2008 + + author: Rick Richardson + +*******************************************************************************/ + +module dreactor.core.Vat; + +import tango.io.selector.Selector; +import tango.io.selector.model.ISelector; +import tango.core.Exception; +import tango.core.Thread; +import tango.core.Atomic; +import tango.util.collection.LinkSeq; +import tango.util.log.Log; + +import dreactor.transport.AsyncSocketConduit; +import dreactor.core.ConnectionHandler; +import dreactor.util.ThreadSafeQueue; + +Logger log; + +enum : int {UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; + +static char[] version_string = "Vat.d 0.1 2008-05-31"; + +class Vat +{ +private + Thread thread; + bool running; + Atomic!(int) pending; + + ThreadSafeQueue!(ConnectionHandler) freshList; + ThreadSafeQueue!(ConnectionHandler) remList; +public + this() + { + freshList = new ThreadSafeQueue!(ConnectionHandler); + remList = new ThreadSafeQueue!(ConnectionHandler); + log = Log.lookup("dreactor.core.Vat"); + } + + void run() + { + running = true; + thread = new Thread(&eventLoop); + thread.start(); + } + + void exit() + { + running = false; + } + + bool addConnection(ConnectionHandler handler) + { + log.trace("adding handler"); + return freshList.push(handler); + } + + bool remConnection(ConnectionHandler handler) + { + return remList.push(handler); + } + +private + void eventLoop() + { + auto selector = new Selector(); + selector.open(); + do + { + auto eventCount = selector.select(0.01); + + if (eventCount > 0) + { + // process events + foreach (SelectionKey key; selector.selectedSet()) + { + if (key.isReadable()) + { + // incoming data + log.trace("Read event fired"); + auto conn = cast(ConnectionHandler) key.attachment; + if ( ConnectionHandler.State.listening == conn.getState() ) + conn.handleConnection(conn.transport, &addConnection); + else + processReturn(conn.handleIncoming(), selector, conn); + } + else if (key.isWritable()) + { + log.trace("Write event fired"); + auto conn = cast(ConnectionHandler) key.attachment; + processReturn(conn.handleOutgoing(), selector, conn); + } + else if (key.isHangup()) + { + log.trace("Hangup event fired"); + auto conn = cast(ConnectionHandler) key.attachment; + processReturn(conn.handleDisconnect(), selector, conn); + } + else if (key.isError() || key.isInvalidHandle()) + { + log.trace("Error event fired"); + // error, close connection + auto conn = cast(ConnectionHandler) key.attachment; + conn.handleError(&remConnection); + } + } + } + else if (eventCount == 0) + { + /* can't think of anything useful to do here. */ + } + else + { + log.error("Selector.select returned {}", eventCount); + } + //add Conduits to listener + freshList.processAll( (ref ConnectionHandler h) + { + selector.reregister(h.transport, h.events(), h); + return 1; + }); + remList.processAll( (ref ConnectionHandler h) + { + selector.unregister(h.transport); + return 1; + }); + + } while (running) + + } + + void processReturn(int result, Selector s, ConnectionHandler h) + { + switch(result) + { + case UNREGISTER: + s.unregister(h.transport); + break; + case REMAIN: + //this space intentially left blank + break; + case REGISTER: + s.register(h.transport, h.events(), h); + break; + case REREGISTER: + s.reregister(h.transport, h.events(), h); + break; + default: + log.error("processReturn: unknown return value"); + } + } +}
--- a/dreactor/protocol/Raw.d Tue Jul 08 11:22:39 2008 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,126 +0,0 @@ -module dreactor.protocol.Raw; - -import tango.io.Conduit; -import tango.io.selector.model.ISelector; -import dreactor.core.AsyncConduit; -import dreactor.core.SelectLoop; -import dreactor.core.ConnectionHandler; -import tango.util.collection.CircularSeq; -import tango.util.log.Log; -import tango.util.log.Configurator; - -Logger log = Log.getLogger("dreactor.core.SelectLoop"); - -/****************************************************************************** - - Basic TCP server or client routines for sending raw data. - -******************************************************************************/ -class RawListener -{ -public - - this(ConnectionHandler mgr, SelectLoop sel) - { - manager = mgr; - mgr.events(Event.Read); - sel.addConnection(mgr); - select = sel; - children = CircularSeq!(ConnectionHandler); - Configurator(); - } - - int accept(Conduit cond) - { - AsyncConduit newcond = new AsyncConduit; - cond.socket().accept(newcond.socket); - ConnectionHandler h = ConnectionHandler.New(manager); - mgr.events(Event.Read); - select.addConnection(mgr); - children.append(mgr); - } - - bool broadcast(char[] outbuf) - { - foreach(ConnectionHandler h; children) - { - if (h.appendBuffer(outbuf)) - { - h.addEvent(Event.Write); - select.addConnection(h); - } - } - } - - void close() - { - - } - - /************************************************************************** - - send - OutgoingHandlerD - To be registered as the response to socket writable event. - Sends data, returns amount sent. Unregisters Handler for sending - if there is no more data left to send. - - ***************************************************************************/ - int send(ConnectionHandler h, RegisterD reg) - { - char[] outbuf = h.nextBuffer(); - if (!outbuf is null) - { - int sent = h.transport.write(outbuf); - if (sent > 0) - { - if (! h.addOffset(sent)) - { - h.removeEvent(Event.write); - reg(h); - } - } - else if (sent == EOF) - { - // EAGAIN ? probably shouldn't have happened. - } - else - { - log.error("Socket send return ERR"); - } - return sent; - } - return 0; - } - - /************************************************************************** - - receive - IncomingHandlerD - Default incoming data handler. Should be replaced with something useful. - - **************************************************************************/ - int receive(ConnectionHandler h, RegisterD reg) - { - - } - -private - ConnectionHandler manager; - SelectLoop select; - CircularSeq!(ConnectionHandler) children; -} - -class RawClient -{ - ConnectionHandler manager; - SelectLoop select; - - this(ConnectionHandler mgr, SelectLoop sel) - { - manager = mgr; - mgr.events(Event.Read); - sel.addConnection(mgr); - select = sel; - } -}
--- a/dreactor/protocol/RawTcp.d Tue Jul 08 11:22:39 2008 -0400 +++ b/dreactor/protocol/RawTcp.d Tue Jul 08 12:16:07 2008 -0400 @@ -8,7 +8,7 @@ import tango.util.log.Config; import dreactor.transport.AsyncSocketConduit; -import dreactor.core.SelectLoop; +import dreactor.core.Vat; import dreactor.core.ConnectionHandler; /****************************************************************************** @@ -20,7 +20,7 @@ { public Logger log; - this(ConnectionHandler mgr, SelectLoop sel, IPv4Address addr) + this(ConnectionHandler mgr, Vat sel, IPv4Address addr) { manager = mgr; mgr.events(Event.Read); @@ -30,7 +30,7 @@ mgr.listen(addr); sel.addConnection(mgr); - select = sel; + vat = sel; log = Log.lookup("dreactor.protocol.RawTcpServer"); log.info("log initialized"); children = new CircularSeq!(ConnectionHandler); @@ -42,7 +42,7 @@ (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket); ConnectionHandler h = ConnectionHandler.New(newcond, manager); h.events(Event.Read); - select.addConnection(h); + vat.addConnection(h); children.append(h); log.info("accepted new connection"); return 0; @@ -55,7 +55,7 @@ if (h.appendOutBuffer(outbuf)) { h.addEvent(Event.Write); - select.addConnection(h); + vat.addConnection(h); } } return 0; @@ -68,7 +68,7 @@ private ConnectionHandler manager; - SelectLoop select; + Vat vat; CircularSeq!(ConnectionHandler) children; } @@ -76,21 +76,21 @@ { public Logger log; - this(ConnectionHandler mgr, SelectLoop sel, Event evts = Event.Read) + this(ConnectionHandler mgr, Vat sel, Event evts = Event.Read) { manager = mgr; manager.events(evts); connected = false; mgr.setOutgoingHandler(&Handlers.onSend); mgr.setIncomingHandler(&Handlers.onReceive); - select = sel; + vat = sel; log = Log.lookup("dreactor.protocol.RawTcpClient"); } int connect(IPv4Address addr) { (cast(AsyncSocketConduit) manager.transport()).connect(addr); - select.addConnection(manager); + vat.addConnection(manager); connected = true; log.info("connected to {}", addr); return 0; @@ -106,15 +106,14 @@ **************************************************************************/ int send(char[] outbuf, IPv4Address addr = null) { - log.info("sending buffer: {}", outbuf); if (!connected) { - log.info("not connected, connecting"); + log.info("send: not connected, connecting"); if (addr !is null) { if (0 > connect(addr)) { - log.error("unable to connect"); + log.error("send: unable to connect"); return -1; } } @@ -122,7 +121,7 @@ if (manager.appendOutBuffer(outbuf)) { manager.addEvent(Event.Write); - if (!select.addConnection(manager)) + if (!vat.addConnection(manager)) { log.error("unable to register mgr"); } @@ -132,7 +131,7 @@ private ConnectionHandler manager; - SelectLoop select; + Vat vat; bool connected; } @@ -200,7 +199,7 @@ char inbuf[8192]; int amt; if((amt = h.transport.read(inbuf)) > 0) - log.info("Received Buffer: {}", inbuf[0 .. amt]); + log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); else log.error("Received no data, err = {}", amt);
--- a/dsss.conf Tue Jul 08 11:22:39 2008 -0400 +++ b/dsss.conf Tue Jul 08 12:16:07 2008 -0400 @@ -1,3 +1,5 @@ [test/test.d] buildFlags=-debug -gc +[test/longtest.d] +buildFlags=-debug -gc
--- a/test/test.d Tue Jul 08 11:22:39 2008 -0400 +++ b/test/test.d Tue Jul 08 12:16:07 2008 -0400 @@ -3,7 +3,7 @@ import tango.net.Socket; import tango.core.Thread; import tango.io.Stdout; -import dreactor.core.SelectLoop; +import dreactor.core.Vat; import dreactor.core.ConnectionHandler; import dreactor.protocol.RawTcp; import dreactor.transport.AsyncSocketConduit; @@ -12,15 +12,15 @@ { AsyncSocketConduit cond = new AsyncSocketConduit; ConnectionHandler lh = new ConnectionHandler(cond, true); - SelectLoop l_loop = new SelectLoop(); - RawTCPListener listener = new RawTCPListener(lh, l_loop, new IPv4Address(5555)); - l_loop.run(); + Vat l_vat = new Vat(); + RawTCPListener listener = new RawTCPListener(lh, l_vat, new IPv4Address(5555)); + l_vat.run(); AsyncSocketConduit clcond = new AsyncSocketConduit; ConnectionHandler ch = new ConnectionHandler(clcond); - SelectLoop c_loop = new SelectLoop(); - RawTCPClient client = new RawTCPClient(ch, c_loop); - c_loop.run(); + Vat c_vat = new Vat(); + RawTCPClient client = new RawTCPClient(ch, c_vat); + c_vat.run(); client.connect(new IPv4Address("localhost", 5555)); //Thread.sleep(1);