Mercurial > projects > dreactor
diff dreactor/protocol/TcpProvider.d @ 13:8c9b1276f623 default tip
bug fixes
author | rick@minifunk |
---|---|
date | Sat, 20 Sep 2008 18:33:11 -0400 |
parents | d6a3cfe7c3de |
children |
line wrap: on
line diff
--- a/dreactor/protocol/TcpProvider.d Wed Aug 27 00:47:33 2008 -0400 +++ b/dreactor/protocol/TcpProvider.d Sat Sep 20 18:33:11 2008 -0400 @@ -1,34 +1,31 @@ -module dreactor.protocol.RawTcp; +module dreactor.protocol.TcpProvider; import tango.io.device.Conduit; import tango.io.selector.model.ISelector; import tango.net.Socket; -import tango.util.collection.CircularSeq; +import tango.util.container.CircularList; import tango.util.log.Log; import tango.util.log.Config; import dreactor.transport.AsyncSocketConduit; import dreactor.core.Vat; -import dreactor.core.Dispatcher; - +public import dreactor.protocol.IProvider; /****************************************************************************** Basic TCP server or client routines for sending raw data. ******************************************************************************/ -class TCPProvider : IProvider +class TcpProvider : IProvider { public - - enum - { - RECEIVE = 1000, - SEND_COMPLETE, - NEW_CONNECTION, - REMOTE_CLOSED, - SEND_ERROR, - RECEIVE_ERROR, - ERROR + enum { + SendComplete = 2000, + NewConnection, + Receive, + RemoteClosed, + SendError, + ReceiveError, + Error } this(AsyncSocketConduit c) @@ -36,41 +33,53 @@ log = Log.lookup("dreactor.protocol.RawTcpServer"); log.info("log initialized"); cond = c; + events = Event.Read; } - this(Vat v, IPv4Address addr) + this(IPv4Address addr, bool listen = false) { - AsyncSocketConduit cond = new AsyncSocketConduit; - cond.socket().setAddressReuse(true); - this(cond); + AsyncSocketConduit c = new AsyncSocketConduit; + c.socket().setAddressReuse(true); + if (listen) + { + c.bind(addr); + c.socket().listen(1000); + listener = listen; + } + else + c.connect(addr); + this(c); } + ~this() { close(); } - Message handleRead(Conduit c) + Message handleRead() { Logger log = Log.lookup("Handlers.onReceive"); + if (listener) + return handleConnect(); + char inbuf[8192]; int amt; - if((amt = h.transport.read(inbuf)) > 0) + if((amt = cond.read(inbuf)) > 0) { - return new Message(inbuf[0 .. amt].dup, RECEIVE, amt); + return Message(inbuf[0 .. amt].dup.ptr, Receive, amt); } else { if (amt == 0) { - children.remove(h); - (cast(AsyncSocketConduit) h.transport).shutdown(); - return Message(null, REMOTE_CLOSED, amt); + cond.shutdown(); + return Message(null, RemoteClosed, amt); } log.error("Received no data, err = {}", amt); } - return new Message(null, ERROR, amt); + return Message(null, Error, amt); } /************************************************************************** @@ -81,7 +90,7 @@ if there is no more data left to send. ***************************************************************************/ - Message handleWrite(Conduit c) + Message handleWrite() { Logger log = Log.lookup("Handlers.onSend"); @@ -95,42 +104,45 @@ { //h.remEvent(Event.Write); //TODO - How do we handle event re-registering - return new Message(null, SEND_COMPLETE, sent); + return Message(null, SendComplete, sent); } } else if (sent == 0) { log.error("Select said socket was writable, but sent 0 bytes"); - return new Message(null, SEND_ERROR, 0); + return Message(null, Error, 0); } else { log.error("Socket send return ERR"); - return new Message(null, SEND_ERROR, sent); + return Message(null, Error, sent); } } else { - //h.remEvent(Event.Write); - //TODO - How do we handle event re-registering - - return new Message(null, SEND_COMPLETE, 0); + remEvent(Event.Write); + if (!regFn(events)) + { + log.error("unable to register mgr"); + } + return Message(null, SendComplete, 0); } } - Message handleDisconnect(Conduit c) + Message handleDisconnect() { - return new Message(c, REMOTE_CLOSED, 0); + return Message(cast(void*)cond, RemoteClosed, 0); } - Message handleError(Conduit c) + Message handleError() { - return new Messsage(null, ERROR, 0); + return Message(cast(void*)cond, Error, 0); } - Message handleConnect(Conduit c) + Message handleConnect() { - return new Message(accept(), NEW_CONNECTION, 0); + log.trace("accepting new connection"); + return Message(cast(void*)accept(), NewConnection, 0); } Conduit getConduit() @@ -138,7 +150,7 @@ return cond; } - int getEvents() + Event getEvents() { return events; } @@ -152,22 +164,9 @@ { AsyncSocketConduit newcond = new AsyncSocketConduit; cond.socket().accept(newcond.socket); - log.info("accepted new connection"); + log.info("accepted new connection {} {}", cast(uint) newcond, newcond.fileHandle()); return newcond; } - - int broadcast(char[] outbuf, TCPProvider[] recips) - { - foreach(TCPProvider c; recips) - { - if (c.appendOutBuffer(outbuf)) - { - h.addEvent(Event.Write); - vat.addConnection(h); - } - } - return 0; - } /************************************************************************** @@ -177,18 +176,16 @@ data as the socket becomes free. **************************************************************************/ - int send(char[] outbufl) + void send(char[] outbuf) { if (appendOutBuffer(outbuf)) { - //TODO - should we always register for all events? or update it when needed? - //d.addEvent(Event.Write); - if (!vat.addConnection(d)) + addEvent(Event.Write); + if (!regFn(events)) { log.error("unable to register mgr"); } } - return 0; } @@ -198,13 +195,6 @@ cond.detach(); } - - ~this() - { - (cast(AsyncSocketConduit)manager.transport).shutdown(); - (cast(AsyncSocketConduit)manager.transport).detach(); - } - int connect(IPv4Address addr) { cond = new AsyncSocketConduit; @@ -218,35 +208,94 @@ /************************************************************************** - send - User-called function to send data to the counterpart at the other - end of the connection. This sets up the connection manager to send - data as the socket becomes free. + appendOutBuffer + + Adds an outgoing buffer to the list. This returns true if the list + was empty, indicating that the handler should be registered with the + SelectLoop. If it returns false, it was probably already registered. + + **************************************************************************/ + bool appendOutBuffer(char[] outbuf) + { + out_buffers.append(outbuf); + out_buffers_len++; + if (out_buffers_len == 1) + return true; + else + return false; + } + + /************************************************************************** + + addOffset + Use this function to update the offset position after a successful data + send. This not only manages the current offset, but will update the + out buffer chain if necessary. + + Returns: false if there is nothing left to send, true if there is. + + **************************************************************************/ + bool addOffset(int off) + in + { + assert(out_buffers_len > 0); + } + body + { + char[] hd = out_buffers.head(); + if ((off + o_offset) >= hd.length) + { + out_buffers.removeHead(); + o_offset = 0; + out_buffers_len--; + return (out_buffers_len > 0); + } + else + o_offset += off; + return true; + } + + /************************************************************************** + + char[] nextBuffer + + Returns a slice of the current outbound buffer, returns a char[] pointing + to null if there is no current outbound buffer **************************************************************************/ - int send(char[] outbuf, IPv4Address addr = null) + char[] nextBuffer() { - if (!connected) + if (out_buffers_len < 1) { - log.info("send: not connected, connecting"); - return -1; + return null; } - if (appendOutBuffer(outbuf)) - { - addEvent(Event.Write); - if (!vat.addConnection(manager)) - { - log.error("unable to register mgr"); - } - } - return 0; + + return out_buffers.head()[o_offset .. $]; } - + void setRegisterFunc( bool delegate (Event) fn) + { + regFn = fn; + } + + void addEvent(Event evt) + { + events |= evt; + } + + void remEvent(Event evt) + { + events &= !evt; + } + private - Vat vat; - Conduit cond; + AsyncSocketConduit cond; Logger log; bool listener; Event events; + bool connected; + CircularList!(char[]) out_buffers; + int out_buffers_len; + int o_offset; + bool delegate (Event) regFn; }