Mercurial > projects > dreactor
view dreactor/protocol/TcpProvider.d @ 13:8c9b1276f623 default tip
bug fixes
author | rick@minifunk |
---|---|
date | Sat, 20 Sep 2008 18:33:11 -0400 |
parents | d6a3cfe7c3de |
children |
line wrap: on
line source
module dreactor.protocol.TcpProvider; import tango.io.device.Conduit; import tango.io.selector.model.ISelector; import tango.net.Socket; import tango.util.container.CircularList; import tango.util.log.Log; import tango.util.log.Config; import dreactor.transport.AsyncSocketConduit; import dreactor.core.Vat; public import dreactor.protocol.IProvider; /****************************************************************************** Basic TCP server or client routines for sending raw data. ******************************************************************************/ class TcpProvider : IProvider { public enum { SendComplete = 2000, NewConnection, Receive, RemoteClosed, SendError, ReceiveError, Error } this(AsyncSocketConduit c) { log = Log.lookup("dreactor.protocol.RawTcpServer"); log.info("log initialized"); cond = c; events = Event.Read; } this(IPv4Address addr, bool listen = false) { AsyncSocketConduit c = new AsyncSocketConduit; c.socket().setAddressReuse(true); if (listen) { c.bind(addr); c.socket().listen(1000); listener = listen; } else c.connect(addr); this(c); } ~this() { close(); } Message handleRead() { Logger log = Log.lookup("Handlers.onReceive"); if (listener) return handleConnect(); char inbuf[8192]; int amt; if((amt = cond.read(inbuf)) > 0) { return Message(inbuf[0 .. amt].dup.ptr, Receive, amt); } else { if (amt == 0) { cond.shutdown(); return Message(null, RemoteClosed, amt); } log.error("Received no data, err = {}", amt); } return Message(null, Error, amt); } /************************************************************************** handleWrite To be registered as the response to socket writable event. Sends data, returns amount sent. Unregisters Handler for sending if there is no more data left to send. ***************************************************************************/ Message handleWrite() { Logger log = Log.lookup("Handlers.onSend"); char[] outbuf = nextBuffer(); if (outbuf !is null) { int sent = cond.write(outbuf); if (sent > 0) { if (! addOffset(sent)) { //h.remEvent(Event.Write); //TODO - How do we handle event re-registering return Message(null, SendComplete, sent); } } else if (sent == 0) { log.error("Select said socket was writable, but sent 0 bytes"); return Message(null, Error, 0); } else { log.error("Socket send return ERR"); return Message(null, Error, sent); } } else { remEvent(Event.Write); if (!regFn(events)) { log.error("unable to register mgr"); } return Message(null, SendComplete, 0); } } Message handleDisconnect() { return Message(cast(void*)cond, RemoteClosed, 0); } Message handleError() { return Message(cast(void*)cond, Error, 0); } Message handleConnect() { log.trace("accepting new connection"); return Message(cast(void*)accept(), NewConnection, 0); } Conduit getConduit() { return cond; } Event getEvents() { return events; } void setEvents(Event e) { events = e; } AsyncSocketConduit accept() { AsyncSocketConduit newcond = new AsyncSocketConduit; cond.socket().accept(newcond.socket); log.info("accepted new connection {} {}", cast(uint) newcond, newcond.fileHandle()); return newcond; } /************************************************************************** send User-called function to send data to the counterpart at the other end of the connection. This sets up the connection manager to send data as the socket becomes free. **************************************************************************/ void send(char[] outbuf) { if (appendOutBuffer(outbuf)) { addEvent(Event.Write); if (!regFn(events)) { log.error("unable to register mgr"); } } } void close() { cond.shutdown(); cond.detach(); } int connect(IPv4Address addr) { cond = new AsyncSocketConduit; cond.socket().setAddressReuse(true); cond.connect(addr); connected = true; log.info("connected to {}", addr); return 0; } /************************************************************************** appendOutBuffer Adds an outgoing buffer to the list. This returns true if the list was empty, indicating that the handler should be registered with the SelectLoop. If it returns false, it was probably already registered. **************************************************************************/ bool appendOutBuffer(char[] outbuf) { out_buffers.append(outbuf); out_buffers_len++; if (out_buffers_len == 1) return true; else return false; } /************************************************************************** addOffset Use this function to update the offset position after a successful data send. This not only manages the current offset, but will update the out buffer chain if necessary. Returns: false if there is nothing left to send, true if there is. **************************************************************************/ bool addOffset(int off) in { assert(out_buffers_len > 0); } body { char[] hd = out_buffers.head(); if ((off + o_offset) >= hd.length) { out_buffers.removeHead(); o_offset = 0; out_buffers_len--; return (out_buffers_len > 0); } else o_offset += off; return true; } /************************************************************************** char[] nextBuffer Returns a slice of the current outbound buffer, returns a char[] pointing to null if there is no current outbound buffer **************************************************************************/ char[] nextBuffer() { if (out_buffers_len < 1) { return null; } return out_buffers.head()[o_offset .. $]; } void setRegisterFunc( bool delegate (Event) fn) { regFn = fn; } void addEvent(Event evt) { events |= evt; } void remEvent(Event evt) { events &= !evt; } private AsyncSocketConduit cond; Logger log; bool listener; Event events; bool connected; CircularList!(char[]) out_buffers; int out_buffers_len; int o_offset; bool delegate (Event) regFn; }