Mercurial > projects > dreactor
view dreactor/protocol/RawTcp.d @ 11:5836613d16ac
reorg! reorg!
author | rick@minifunk |
---|---|
date | Tue, 12 Aug 2008 16:59:56 -0400 |
parents | e75a2e506b1d |
children |
line wrap: on
line source
module dreactor.protocol.RawTcp; import tango.io.Conduit; import tango.io.selector.model.ISelector; import tango.net.Socket; import tango.util.collection.CircularSeq; import tango.util.log.Log; import tango.util.log.Config; import dreactor.transport.AsyncSocketConduit; import dreactor.core.Vat; import dreactor.core.Dispatcher; /****************************************************************************** Basic TCP server or client routines for sending raw data. ******************************************************************************/ class RawTCPListener { public this(AsyncSocketConduit cond) { log = Log.lookup("dreactor.protocol.RawTcpServer"); log.info("log initialized"); children = new CircularSeq!(Dispatcher); } this(Vat sel, IPv4Address addr) { AsyncSocketConduit cond = new AsyncSocketConduit; cond.socket().setAddressReuse(true); this(cond); } ~this() { close(); } AsyncSocketConduit accept(Conduit cond, RegisterD reg) { AsyncSocketConduit newcond = new AsyncSocketConduit; (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket); h.events(Event.Read); vat.addConnection(h); children.append(h); log.info("accepted new connection"); return newcond; } int broadcast(char[] outbuf, AsyncSocketConduit[] recips) { foreach(AsyncSocketConduit c; recips) { if (c.appendOutBuffer(outbuf)) { h.addEvent(Event.Write); vat.addConnection(h); } } return 0; } /************************************************************************** send User-called function to send data to the counterpart at the other end of the connection. This sets up the connection manager to send data as the socket becomes free. **************************************************************************/ int send(Dispatcher d, char[] outbuf, IPv4Address addr = null) { if (d.appendOutBuffer(outbuf)) { d.addEvent(Event.Write); if (!vat.addConnection(d)) { log.error("unable to register mgr"); } } return 0; } /************************************************************************** receive IncomingHandlerD Default incoming data handler. Should be replaced with something useful. **************************************************************************/ int onReceive(Dispatcher h) { Logger log = Log.lookup("Handlers.onReceive"); char inbuf[8192]; int amt; if((amt = h.transport.read(inbuf)) > 0) { if (dataHandler) dataHandler(inbuf[0 .. amt], h); else log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); } else { if (amt == 0) { children.remove(h); (cast(AsyncSocketConduit) h.transport).shutdown(); return CLOSE; } log.error("Received no data, err = {}", amt); } return REMAIN; } void close() { foreach(Dispatcher d; children) { (cast(AsyncSocketConduit)d.transport).shutdown(); (cast(AsyncSocketConduit)d.transport).detach(); } (cast(AsyncSocketConduit)manager.transport).shutdown(); (cast(AsyncSocketConduit)manager.transport).detach(); } void setDataHandler(void delegate(char[], Dispatcher) h) { dataHandler = h; } private Vat vat; CircularSeq!(Dispatcher) children; Dispatcher manager; Logger log; RawTCPHandler h; void delegate(char[], Dispatcher) dataHandler; } class RawTCPClient { public this(Dispatcher mgr, Vat sel, Event evts = Event.Read) { manager = mgr; manager.events(evts); connected = false; mgr.setOutgoingHandler(&RawTCPHandler.onSend); mgr.setIncomingHandler(&onReceive); mgr.setErrorHandler(&RawTCPHandler.onError); mgr.setDisconnectHandler(&RawTCPHandler.onHangup); vat = sel; log = Log.lookup("dreactor.protocol.RawTcpClient"); } this(Vat sel, Event evts = Event.Read) { AsyncSocketConduit clcond = new AsyncSocketConduit; Dispatcher ch = new Dispatcher(clcond); this(ch, sel, evts); } ~this() { (cast(AsyncSocketConduit)manager.transport).shutdown(); (cast(AsyncSocketConduit)manager.transport).detach(); } int connect(IPv4Address addr) { (cast(AsyncSocketConduit) manager.transport()).connect(addr); vat.addConnection(manager); connected = true; log.info("connected to {}", addr); return 0; } /************************************************************************** send User-called function to send data to the counterpart at the other end of the connection. This sets up the connection manager to send data as the socket becomes free. **************************************************************************/ int send(char[] outbuf, IPv4Address addr = null) { if (!connected) { log.info("send: not connected, connecting"); if (addr !is null) { if (0 > connect(addr)) { log.error("send: unable to connect"); return -1; } } } if (manager.appendOutBuffer(outbuf)) { manager.addEvent(Event.Write); if (!vat.addConnection(manager)) { log.error("unable to register mgr"); } } return 0; } /************************************************************************** receive IncomingHandlerD Default incoming data handler. Should be replaced with something useful. **************************************************************************/ public int onReceive(Dispatcher h) { Logger log = Log.lookup("Handlers.onReceive"); char inbuf[8192]; int amt; if((amt = h.transport.read(inbuf)) > 0) { if (dataHandler) dataHandler(inbuf[0 .. amt], h); else log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); } else { if (amt == 0) { return CLOSE; } log.error("Received no data, err = {}", amt); } return REMAIN; } void setDataHandler(void delegate(char[], Dispatcher) h) { dataHandler = h; } private void delegate(char[], Dispatcher) dataHandler; Dispatcher manager; Vat vat; bool connected; Logger log; RawTCPHandler h; } /****************************************************************************** Default Event handlers common to both listener/clients ******************************************************************************/ struct RawTCPHandler { /************************************************************************** onSend OutgoingHandlerD To be registered as the response to socket writable event. Sends data, returns amount sent. Unregisters Handler for sending if there is no more data left to send. ***************************************************************************/ public static int onSend(Dispatcher h) { Logger log = Log.lookup("Handlers.onSend"); char[] outbuf = h.nextBuffer(); if (outbuf !is null) { int sent = h.transport.write(outbuf); if (sent > 0) { if (! h.addOffset(sent)) { h.remEvent(Event.Write); return REREGISTER; } } else if (sent == AsyncSocketConduit.Eof) { log.error("Select said socket was writable, but sent 0 bytes"); } else { log.error("Socket send return ERR"); } return REMAIN; } else { h.remEvent(Event.Write); return REREGISTER; } } static int onHangup(Dispatcher d) { return UNREGISTER; } static int onError(Dispatcher d, RegisterD unreg) { return CLOSE; } } bool includes(Dispatcher[] haystack, Dispatcher needle) { foreach(Dispatcher h; haystack) { if (h is needle) return true; } return false; }