Mercurial > projects > dreactor
view dreactor/protocol/RawTcp.d @ 9:5412a1ff2e49
adding chat client and more updates
author | rick@minifunk |
---|---|
date | Sat, 12 Jul 2008 10:42:41 -0400 |
parents | 60cf25102fb2 |
children | e75a2e506b1d |
line wrap: on
line source
module dreactor.protocol.RawTcp; import tango.io.Conduit; import tango.io.selector.model.ISelector; import tango.net.Socket; import tango.util.collection.CircularSeq; import tango.util.log.Log; import tango.util.log.Config; import dreactor.transport.AsyncSocketConduit; import dreactor.core.Vat; import dreactor.core.Dispatcher; /****************************************************************************** Basic TCP server or client routines for sending raw data. ******************************************************************************/ class RawTCPListener : RawTCPHandler { public Logger log; this(Dispatcher mgr, Vat sel, IPv4Address addr) { manager = mgr; mgr.events(Event.Read); mgr.setOutgoingHandler(&onSend); mgr.setIncomingHandler(&onReceive); mgr.setConnectHandler(&accept); mgr.setErrorHandler(&onError); mgr.setDisconnectHandler(&onHangup); mgr.listen(addr); sel.addConnection(mgr); vat = sel; log = Log.lookup("dreactor.protocol.RawTcpServer"); log.info("log initialized"); children = new CircularSeq!(Dispatcher); } this(Vat sel, IPv4Address addr) { AsyncSocketConduit cond = new AsyncSocketConduit; cond.socket().setAddressReuse(true); Dispatcher lh = new Dispatcher(cond, true); this(lh, sel, addr); } ~this() { foreach(Dispatcher d; children) { (cast(AsyncSocketConduit)d.transport).shutdown(); (cast(AsyncSocketConduit)d.transport).detach(); } (cast(AsyncSocketConduit)manager.transport).shutdown(); (cast(AsyncSocketConduit)manager.transport).detach(); } int accept(Conduit cond, RegisterD reg) { AsyncSocketConduit newcond = new AsyncSocketConduit; (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket); Dispatcher h = Dispatcher.New(newcond, manager); h.events(Event.Read); vat.addConnection(h); children.append(h); log.info("accepted new connection"); return 0; } int broadcast(char[] outbuf, Dispatcher[] excluded = null) { foreach(Dispatcher h; children) { if (excluded && excluded.includes(h)) continue; if (h.appendOutBuffer(outbuf)) { h.addEvent(Event.Write); vat.addConnection(h); } } return 0; } /************************************************************************** send User-called function to send data to the counterpart at the other end of the connection. This sets up the connection manager to send data as the socket becomes free. **************************************************************************/ int send(Dispatcher d, char[] outbuf, IPv4Address addr = null) { if (d.appendOutBuffer(outbuf)) { d.addEvent(Event.Write); d.setOutgoingHandler(&onSend); if (!vat.addConnection(d)) { log.error("unable to register mgr"); } } return 0; } public int onReceive(Dispatcher h) { Logger log = Log.lookup("Handlers.onReceive"); char inbuf[8192]; int amt; if((amt = h.transport.read(inbuf)) > 0) { if (dataHandler) dataHandler(inbuf[0 .. amt], h); else log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); } else { if (amt == 0) { children.remove(h); (cast(AsyncSocketConduit) h.transport).shutdown(); return CLOSE; } log.error("Received no data, err = {}", amt); } return REMAIN; } void close() { } private Dispatcher manager; Vat vat; CircularSeq!(Dispatcher) children; } class RawTCPClient : RawTCPHandler { public Logger log; this(Dispatcher mgr, Vat sel, Event evts = Event.Read) { manager = mgr; manager.events(evts); connected = false; mgr.setOutgoingHandler(&onSend); mgr.setIncomingHandler(&onReceive); mgr.setErrorHandler(&onError); mgr.setDisconnectHandler(&onHangup); vat = sel; log = Log.lookup("dreactor.protocol.RawTcpClient"); } this(Vat sel, Event evts = Event.Read) { AsyncSocketConduit clcond = new AsyncSocketConduit; Dispatcher ch = new Dispatcher(clcond); this(ch, sel, evts); } ~this() { (cast(AsyncSocketConduit)manager.transport).shutdown(); (cast(AsyncSocketConduit)manager.transport).detach(); } int connect(IPv4Address addr) { (cast(AsyncSocketConduit) manager.transport()).connect(addr); vat.addConnection(manager); connected = true; log.info("connected to {}", addr); return 0; } /************************************************************************** send User-called function to send data to the counterpart at the other end of the connection. This sets up the connection manager to send data as the socket becomes free. **************************************************************************/ int send(char[] outbuf, IPv4Address addr = null) { if (!connected) { log.info("send: not connected, connecting"); if (addr !is null) { if (0 > connect(addr)) { log.error("send: unable to connect"); return -1; } } } if (manager.appendOutBuffer(outbuf)) { manager.addEvent(Event.Write); if (!vat.addConnection(manager)) { log.error("unable to register mgr"); } } return 0; } private Dispatcher manager; Vat vat; bool connected; } /****************************************************************************** Default Event handlers common to both listener/clients ******************************************************************************/ class RawTCPHandler { /************************************************************************** onSend OutgoingHandlerD To be registered as the response to socket writable event. Sends data, returns amount sent. Unregisters Handler for sending if there is no more data left to send. ***************************************************************************/ public int onSend(Dispatcher h) { Logger log = Log.lookup("Handlers.onSend"); char[] outbuf = h.nextBuffer(); if (outbuf !is null) { int sent = h.transport.write(outbuf); if (sent > 0) { if (! h.addOffset(sent)) { h.remEvent(Event.Write); return REREGISTER; } } else if (sent == AsyncSocketConduit.Eof) { log.error("Select said socket was writable, but sent 0 bytes"); } else { log.error("Socket send return ERR"); } return REMAIN; } else { h.remEvent(Event.Write); return REREGISTER; } } /************************************************************************** receive IncomingHandlerD Default incoming data handler. Should be replaced with something useful. **************************************************************************/ public int onReceive(Dispatcher h) { Logger log = Log.lookup("Handlers.onReceive"); char inbuf[8192]; int amt; if((amt = h.transport.read(inbuf)) > 0) { if (dataHandler) dataHandler(inbuf[0 .. amt], h); else log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]); } else { if (amt == 0) { return CLOSE; } log.error("Received no data, err = {}", amt); } return REMAIN; } int onHangup(Dispatcher d) { return UNREGISTER; } int onError(Dispatcher d, RegisterD unreg) { return CLOSE; } void setDataHandler(void delegate(char[],Dispatcher) del) { dataHandler = del; } protected void delegate(char[], Dispatcher) dataHandler; } bool includes(Dispatcher[] haystack, Dispatcher needle) { foreach(Dispatcher h; haystack) { if (h is needle) return true; } return false; }