Mercurial > projects > dreactor
diff asyncdreactor/protocol/RawTcp.d @ 11:5836613d16ac
reorg! reorg!
author | rick@minifunk |
---|---|
date | Tue, 12 Aug 2008 16:59:56 -0400 |
parents | dreactor/protocol/RawTcp.d@e75a2e506b1d |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/asyncdreactor/protocol/RawTcp.d Tue Aug 12 16:59:56 2008 -0400 @@ -0,0 +1,345 @@ +module dreactor.protocol.RawTcp; + +import tango.io.Conduit; +import tango.io.selector.model.ISelector; +import tango.net.Socket; +import tango.util.collection.CircularSeq; +import tango.util.log.Log; +import tango.util.log.Config; + +import dreactor.transport.AsyncSocketConduit; +import dreactor.core.Vat; +import dreactor.core.Dispatcher; + +/****************************************************************************** + + Basic TCP server or client routines for sending raw data. + +******************************************************************************/ +class RawTCPListener +{ +public + this(Dispatcher mgr, Vat sel, IPv4Address addr) + { + manager = mgr; + mgr.events(Event.Read); + mgr.setOutgoingHandler(&RawTCPHandler.onSend); + mgr.setIncomingHandler(&onReceive); + mgr.setConnectHandler(&accept); + mgr.setErrorHandler(&RawTCPHandler.onError); + mgr.setDisconnectHandler(&RawTCPHandler.onHangup); + mgr.listen(addr); + + sel.addConnection(mgr); + vat = sel; + log = Log.lookup("dreactor.protocol.RawTcpServer"); + log.info("log initialized"); + children = new CircularSeq!(Dispatcher); + } + this(Vat sel, IPv4Address addr) + { + AsyncSocketConduit cond = new AsyncSocketConduit; + cond.socket().setAddressReuse(true); + Dispatcher lh = new Dispatcher(cond, true); + this(lh, sel, addr); + } + + ~this() + { + close(); + } + + int accept(Conduit cond, RegisterD reg) + { + AsyncSocketConduit newcond = new AsyncSocketConduit; + (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket); + Dispatcher h = Dispatcher.New(newcond, manager); + h.events(Event.Read); + vat.addConnection(h); + children.append(h); + log.info("accepted new connection"); + return 0; + } + + int broadcast(char[] outbuf, Dispatcher[] excluded = null) + { + foreach(Dispatcher h; children) + { + if (excluded && excluded.includes(h)) + continue; + if (h.appendOutBuffer(outbuf)) + { + h.addEvent(Event.Write); + vat.addConnection(h); + } + } + return 0; + } + + /************************************************************************** + + send + User-called function to send data to the counterpart at the other + end of the connection. This sets up the connection manager to send + data as the socket becomes free. + + **************************************************************************/ + int send(Dispatcher d, char[] outbuf, IPv4Address addr = null) + { + if (d.appendOutBuffer(outbuf)) + { + d.addEvent(Event.Write); + if (!vat.addConnection(d)) + { + log.error("unable to register mgr"); + } + } + return 0; + } + + /************************************************************************** + + receive + IncomingHandlerD + Default incoming data handler. Should be replaced with something useful. + + **************************************************************************/ + int onReceive(Dispatcher h) + { + Logger log = Log.lookup("Handlers.onReceive"); + + char inbuf[8192]; + int amt; + if((amt = h.transport.read(inbuf)) > 0) + { + if (dataHandler) + dataHandler(inbuf[0 .. amt], h); + else + log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); + } + else + { + if (amt == 0) + { + children.remove(h); + (cast(AsyncSocketConduit) h.transport).shutdown(); + return CLOSE; + } + log.error("Received no data, err = {}", amt); + } + return REMAIN; + } + + void close() + { + foreach(Dispatcher d; children) + { + (cast(AsyncSocketConduit)d.transport).shutdown(); + (cast(AsyncSocketConduit)d.transport).detach(); + } + (cast(AsyncSocketConduit)manager.transport).shutdown(); + (cast(AsyncSocketConduit)manager.transport).detach(); + + } + + void setDataHandler(void delegate(char[], Dispatcher) h) + { + dataHandler = h; + } + +private + Vat vat; + CircularSeq!(Dispatcher) children; + Dispatcher manager; + Logger log; + RawTCPHandler h; + void delegate(char[], Dispatcher) dataHandler; +} + +class RawTCPClient +{ + +public + this(Dispatcher mgr, Vat sel, Event evts = Event.Read) + { + manager = mgr; + manager.events(evts); + connected = false; + mgr.setOutgoingHandler(&RawTCPHandler.onSend); + mgr.setIncomingHandler(&onReceive); + mgr.setErrorHandler(&RawTCPHandler.onError); + mgr.setDisconnectHandler(&RawTCPHandler.onHangup); + vat = sel; + log = Log.lookup("dreactor.protocol.RawTcpClient"); + } + + this(Vat sel, Event evts = Event.Read) + { + AsyncSocketConduit clcond = new AsyncSocketConduit; + Dispatcher ch = new Dispatcher(clcond); + this(ch, sel, evts); + } + + ~this() + { + (cast(AsyncSocketConduit)manager.transport).shutdown(); + (cast(AsyncSocketConduit)manager.transport).detach(); + } + + int connect(IPv4Address addr) + { + (cast(AsyncSocketConduit) manager.transport()).connect(addr); + vat.addConnection(manager); + connected = true; + log.info("connected to {}", addr); + return 0; + } + + /************************************************************************** + + send + User-called function to send data to the counterpart at the other + end of the connection. This sets up the connection manager to send + data as the socket becomes free. + + **************************************************************************/ + int send(char[] outbuf, IPv4Address addr = null) + { + if (!connected) + { + log.info("send: not connected, connecting"); + if (addr !is null) + { + if (0 > connect(addr)) + { + log.error("send: unable to connect"); + return -1; + } + } + } + if (manager.appendOutBuffer(outbuf)) + { + manager.addEvent(Event.Write); + if (!vat.addConnection(manager)) + { + log.error("unable to register mgr"); + } + } + return 0; + } + + /************************************************************************** + + receive + IncomingHandlerD + Default incoming data handler. Should be replaced with something useful. + + **************************************************************************/ + public int onReceive(Dispatcher h) + { + Logger log = Log.lookup("Handlers.onReceive"); + + char inbuf[8192]; + int amt; + if((amt = h.transport.read(inbuf)) > 0) + { + if (dataHandler) + dataHandler(inbuf[0 .. amt], h); + else + log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); + } + else + { + if (amt == 0) + { + return CLOSE; + } + log.error("Received no data, err = {}", amt); + } + return REMAIN; + } + + void setDataHandler(void delegate(char[], Dispatcher) h) + { + dataHandler = h; + } +private + void delegate(char[], Dispatcher) dataHandler; + Dispatcher manager; + Vat vat; + bool connected; + Logger log; + RawTCPHandler h; +} + + +/****************************************************************************** + + Default Event handlers common to both listener/clients + +******************************************************************************/ +struct RawTCPHandler +{ + /************************************************************************** + + onSend + OutgoingHandlerD + To be registered as the response to socket writable event. + Sends data, returns amount sent. Unregisters Handler for sending + if there is no more data left to send. + + ***************************************************************************/ + public static int onSend(Dispatcher h) + { + Logger log = Log.lookup("Handlers.onSend"); + + char[] outbuf = h.nextBuffer(); + if (outbuf !is null) + { + int sent = h.transport.write(outbuf); + if (sent > 0) + { + if (! h.addOffset(sent)) + { + h.remEvent(Event.Write); + return REREGISTER; + } + } + else if (sent == AsyncSocketConduit.Eof) + { + log.error("Select said socket was writable, but sent 0 bytes"); + } + else + { + log.error("Socket send return ERR"); + } + return REMAIN; + } + else + { + h.remEvent(Event.Write); + return REREGISTER; + } + } + + static int onHangup(Dispatcher d) + { + return UNREGISTER; + } + + static int onError(Dispatcher d, RegisterD unreg) + { + return CLOSE; + } + +} + +bool includes(Dispatcher[] haystack, Dispatcher needle) +{ + foreach(Dispatcher h; haystack) + { + if (h is needle) + return true; + } + return false; +}