Mercurial > projects > dreactor
changeset 8:60cf25102fb2
fixing mercurial's screwup
author | rick@minifunk |
---|---|
date | Wed, 09 Jul 2008 00:56:21 -0400 |
parents | 2c6ab06a8829 |
children | 5412a1ff2e49 |
files | dreactor/core/Vat.d dreactor/protocol/RawTcp.d dsss.conf |
diffstat | 3 files changed, 99 insertions(+), 17 deletions(-) [+] |
line wrap: on
line diff
--- a/dreactor/core/Vat.d Wed Jul 09 00:32:11 2008 -0400 +++ b/dreactor/core/Vat.d Wed Jul 09 00:56:21 2008 -0400 @@ -26,7 +26,7 @@ Logger log; -enum : int {UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; +enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; static char[] version_string = "Vat.d 0.1 2008-05-31"; @@ -143,6 +143,11 @@ { switch(result) { + case CLOSE: + s.unregister(h.transport); + (cast (AsyncSocketConduit) h.transport).shutdown(); + (cast (AsyncSocketConduit) h.transport).detach(); + break; case UNREGISTER: s.unregister(h.transport); break;
--- a/dreactor/protocol/RawTcp.d Wed Jul 09 00:32:11 2008 -0400 +++ b/dreactor/protocol/RawTcp.d Wed Jul 09 00:56:21 2008 -0400 @@ -16,7 +16,7 @@ Basic TCP server or client routines for sending raw data. ******************************************************************************/ -class RawTCPListener +class RawTCPListener : RawTCPHandler { public Logger log; @@ -24,9 +24,11 @@ { manager = mgr; mgr.events(Event.Read); - mgr.setOutgoingHandler(&Handlers.onSend); - mgr.setIncomingHandler(&Handlers.onReceive); + mgr.setOutgoingHandler(&onSend); + mgr.setIncomingHandler(&onReceive); mgr.setConnectHandler(&accept); + mgr.setErrorHandler(&onError); + mgr.setDisconnectHandler(&onHangup); mgr.listen(addr); sel.addConnection(mgr); @@ -35,6 +37,12 @@ log.info("log initialized"); children = new CircularSeq!(Dispatcher); } + this(Vat sel, IPv4Address addr) + { + AsyncSocketConduit cond = new AsyncSocketConduit; + Dispatcher lh = new Dispatcher(cond, true); + this(lh, sel, addr); + } int accept(Conduit cond, RegisterD reg) { @@ -48,10 +56,12 @@ return 0; } - int broadcast(char[] outbuf) + int broadcast(char[] outbuf, Dispatcher[] excluded) { foreach(Dispatcher h; children) { + if (excluded && excluded.includes(h)) + continue; if (h.appendOutBuffer(outbuf)) { h.addEvent(Event.Write); @@ -60,6 +70,31 @@ } return 0; } + + public int onReceive(Dispatcher h) + { + Logger log = Log.lookup("Handlers.onReceive"); + + char inbuf[8192]; + int amt; + if((amt = h.transport.read(inbuf)) > 0) + { + if (dataHandler) + dataHandler(inbuf[0 .. amt], h); + else + log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); + } + else + { + if (amt == 0) + { + children.remove(h); + return CLOSE; + } + log.error("Received no data, err = {}", amt); + } + return REMAIN; + } void close() { @@ -72,7 +107,7 @@ CircularSeq!(Dispatcher) children; } -class RawTCPClient +class RawTCPClient : RawTCPHandler { public Logger log; @@ -81,11 +116,19 @@ manager = mgr; manager.events(evts); connected = false; - mgr.setOutgoingHandler(&Handlers.onSend); - mgr.setIncomingHandler(&Handlers.onReceive); + mgr.setOutgoingHandler(&onSend); + mgr.setIncomingHandler(&onReceive); + mgr.setErrorHandler(&onError); + mgr.setDisconnectHandler(&onHangup); vat = sel; log = Log.lookup("dreactor.protocol.RawTcpClient"); } + this(Vat sel, Event evts = Event.Read) + { + AsyncSocketConduit clcond = new AsyncSocketConduit; + Dispatcher ch = new Dispatcher(clcond); + this(ch, sel, evts); + } int connect(IPv4Address addr) { @@ -141,7 +184,7 @@ Default Event handlers common to both listener/clients ******************************************************************************/ -class Handlers +class RawTCPHandler { /************************************************************************** @@ -152,7 +195,7 @@ if there is no more data left to send. ***************************************************************************/ -public static int onSend(Dispatcher h) +public int onSend(Dispatcher h) { Logger log = Log.lookup("Handlers.onSend"); @@ -192,17 +235,52 @@ Default incoming data handler. Should be replaced with something useful. **************************************************************************/ -public static int onReceive(Dispatcher h) +public int onReceive(Dispatcher h) { Logger log = Log.lookup("Handlers.onReceive"); char inbuf[8192]; int amt; if((amt = h.transport.read(inbuf)) > 0) - log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); - else + { + if (dataHandler) + dataHandler(inbuf[0 .. amt], h); + else + log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); + } + else + { + if (amt == 0) + { + return CLOSE; + } log.error("Received no data, err = {}", amt); - + } return REMAIN; } +int onHangup(Dispatcher d) +{ + return UNREGISTER; } +int onError(Dispatcher d, RegisterD unreg) +{ + return CLOSE; +} + +void setDataHandler(void delegate(char[],Dispatcher) del) +{ + dataHandler = del; +} +protected + void delegate(char[], Dispatcher) dataHandler; +} + +bool includes(Dispatcher[] haystack, Dispatcher needle) +{ + foreach(Dispatcher h; haystack) + { + if (h is needle) + return true; + } + return false; +}