# HG changeset patch # User rick@minifunk # Date 1215534206 14400 # Node ID 287ba7de97c450f86acae50a7f60c65e4a6b4e7e # Parent f875a1f278b84345d7e167b40640be094672570c more housekeeping diff -r f875a1f278b8 -r 287ba7de97c4 dreactor/core/ConnectionHandler.d --- a/dreactor/core/ConnectionHandler.d Tue Jul 08 12:16:07 2008 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,387 +0,0 @@ -module dreactor.core.ConnectionHandler; - -import tango.io.selector.model.ISelector; -import tango.util.collection.CircularSeq; -import tango.net.Socket; -public import tango.core.Exception; -import dreactor.transport.AsyncSocketConduit; - -import tango.util.log.Log; -import tango.util.log.Config; - -alias bool delegate(ConnectionHandler) RegisterD; - -alias int delegate(ConnectionHandler) IncomingHandlerD; -alias int delegate(ConnectionHandler) OutgoingHandlerD; -alias int delegate(ConnectionHandler, RegisterD) ErrorHandlerD; -alias int delegate(ConnectionHandler) DisconnectHandlerD; -alias int delegate(Conduit, RegisterD) ConnectHandlerD; - -alias int function(ConnectionHandler) IncomingHandlerF; -alias int function(ConnectionHandler) OutgoingHandlerF; -alias int function(ConnectionHandler, RegisterD) ErrorHandlerF; -alias int function(ConnectionHandler) DisconnectHandlerF; -alias int function(Conduit, RegisterD) ConnectHandlerF; - - -/****************************************************************************** - ConnectionHandler object. To be used by the SelectLoop to manage callbacks - for events. It may also be used to buffer data inbetween requests. - These can be populated passed to a SelectLoop directly by the end user, - or may be managed by a chosen Protocol. -******************************************************************************/ -class ConnectionHandler -{ - public - enum State { init, connected, listening, idle, closing }; - - /************************************************************************** - - Standard Ctor, takes a transport_ - - **************************************************************************/ - this (Conduit trans, bool listener = false) - { - transport_ = trans; - ibuf_len = 0; - i_offset = 0; - o_offset = 0; - out_buffers = new CircularSeq!(char[]); - log = Log.lookup("dreactor.core.ConnectionHandler"); - } - - /********************************************************************** - - Setters for the handlers. These are set by the Protocols as well - - **********************************************************************/ - - void setIncomingHandler(IncomingHandlerD hand) - { - inD = hand; - inF = null; - } - - void setIncomingHandler(IncomingHandlerF hand) - { - inF = hand; - inD = null; - } - - void setOutgoingHandler(OutgoingHandlerD hand) - { - outD = hand; - outF = null; - } - - void setOutgoingHandler(OutgoingHandlerF hand) - { - outF = hand; - outD = null; - } - - void setErrorHandler(ErrorHandlerD hand) - { - errD = hand; - errF = null; - } - - void setErrorHandler(ErrorHandlerF hand) - { - errF = hand; - errD = null; - } - - void setDisconnectHandler(DisconnectHandlerD hand) - { - disD = hand; - disF = null; - } - - void setDisconnectHandler(DisconnectHandlerF hand) - { - disF = hand; - disD = null; - } - - void setConnectHandler(ConnectHandlerD hand) - { - conD = hand; - conF = null; - } - - void setConnectHandler(ConnectHandlerF hand) - { - conF = hand; - conD = null; - } - - /********************************************************************** - - Handlers to be called by the SelectLoop when events occur - - **********************************************************************/ - int handleIncoming() - { - if (inD !is null) - return inD(this); - else if (inF !is null) - return inF(this); - else - throw new Exception("no Incoming handler set"); - } - - int handleOutgoing() - { - if (outD !is null) - return outD(this); - else if (outF !is null) - return outF(this); - else - throw new Exception("no Outgoing handler set"); - } - - int handleError(RegisterD reg) - { - if (errD !is null) - return errD(this, reg); - else if (errF !is null) - return errF(this, reg); - } - - int handleDisconnect() - { - if (disD !is null) - return disD(this); - else if (disF !is null) - return disF(this); - } - - int handleConnection(Conduit cond, RegisterD reg ) - { - if (conD !is null) - { - return conD(cond, reg); - } - else if (conF !is null) - { - return conF(cond, reg); - } - } - - /************************************************************************** - - Sending / Receiving helpers - - **************************************************************************/ - - /************************************************************************** - - appendOutBuffer - - Adds an outgoing buffer to the list. This returns true if the list - was empty, indicating that the handler should be registered with the - SelectLoop. If it returns false, it was probably already registered. - - **************************************************************************/ - synchronized bool appendOutBuffer(char[] outbuf) - { - out_buffers.append(outbuf); - out_buffers_len++; - if (out_buffers_len == 1) - return true; - else - return false; - } - - /************************************************************************** - - addOffset - Use this function to update the offset position after a successful data - send. This not only manages the current offset, but will update the - out buffer chain if necessary. - - Returns: false if there is nothing left to send, true if there is. - - **************************************************************************/ - synchronized bool addOffset(int off) - in - { - assert(out_buffers_len > 0); - } - body - { - char[] hd = out_buffers.head(); - if ((off + o_offset) >= hd.length) - { - out_buffers.removeHead(); - o_offset = 0; - out_buffers_len--; - return (out_buffers_len > 0); - } - else - o_offset += off; - return true; - } - - /************************************************************************** - - char[] nextBuffer - - Returns a slice of the current outbound buffer, returns a char[] pointing - to null if there is no current outbound buffer - - **************************************************************************/ - synchronized char[] nextBuffer() - { - if (out_buffers_len < 1) - { - return null; - } - - return out_buffers.head()[o_offset .. $]; - } - - /************************************************************************** - - listen - Enable listening on the socket attached to this connectionhandler - - **************************************************************************/ - int listen(IPv4Address addr) - { - (cast(AsyncSocketConduit)transport_).bind(addr).listen(); - state_ = State.listening; - return 0; - } - - Conduit transport() - { - return transport_; - } - /************************************************************************** - - Configuration functions - - **************************************************************************/ - Event events() - { - return events_; - } - void events(Event e) - { - events_ = e; - } - void addEvent(Event e) - { - events_ |= e; - } - void remEvent(Event e) - { - events_ &= ~e; - } - - State getState() {return state_;} - - /* - connection handlers are left out of this because - this method is used by the listener socket to pass - on its handlers to the accepted socket. An accepted - socket will generally do different things onConnection - */ - void setHandlers(ConnectionHandler other) - { - inD = other.inD; - outD = other.outD; - errD = other.errD; - disD = other.disD; - inF = other.inF; - outF = other.outF; - errF = other.errF; - disF = other.disF; - } - - /************************************************************************** - - Freelist allocators and deallocators - - **************************************************************************/ - static synchronized ConnectionHandler New(Conduit tran, ConnectionHandler other = null) - { - ConnectionHandler hand; - if (freelist) - { - hand = freelist; - freelist = hand.next; - hand.transport_ = tran; - } - else - hand = new ConnectionHandler(tran); - - if (!(other is null)) - { - hand.setHandlers(other); - } - return hand; - } - - static synchronized void Delete(ConnectionHandler hand) - { - hand.next = freelist; - freelist = hand.initialize(); - } - -private - - char[] in_buffer; - CircularSeq!(char[]) out_buffers; - int out_buffers_len; - int ibuf_len; - int i_offset; - int o_offset; - Logger log; - - package Conduit transport_; - State state_; - Event events_; - IncomingHandlerD inD; - OutgoingHandlerD outD; - ErrorHandlerD errD; - DisconnectHandlerD disD; - ConnectHandlerD conD; - - IncomingHandlerF inF; - OutgoingHandlerF outF; - ErrorHandlerF errF; - DisconnectHandlerF disF; - ConnectHandlerF conF; - - static ConnectionHandler freelist; - ConnectionHandler next; - - /************************************************************************** - Copy ctor, creates a new ConnectionHandler using the settings - of an existing handler. - **************************************************************************/ - ConnectionHandler initialize() - { - transport_ = null; - state_ = State.init; - ibuf_len = 0; - i_offset = 0; - o_offset = 0; - out_buffers.clear(); - inD = null; - outD = null; - errD = null; - disD = null; - conD = null; - inF = null; - outF = null; - errF = null; - disF = null; - conF = null; - return this; - } -} - diff -r f875a1f278b8 -r 287ba7de97c4 dreactor/core/Dispatcher.d --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/core/Dispatcher.d Tue Jul 08 12:23:26 2008 -0400 @@ -0,0 +1,387 @@ +module dreactor.core.Dispatcher; + +import tango.io.selector.model.ISelector; +import tango.util.collection.CircularSeq; +import tango.net.Socket; +public import tango.core.Exception; +import dreactor.transport.AsyncSocketConduit; + +import tango.util.log.Log; +import tango.util.log.Config; + +alias bool delegate(Dispatcher) RegisterD; + +alias int delegate(Dispatcher) IncomingHandlerD; +alias int delegate(Dispatcher) OutgoingHandlerD; +alias int delegate(Dispatcher, RegisterD) ErrorHandlerD; +alias int delegate(Dispatcher) DisconnectHandlerD; +alias int delegate(Conduit, RegisterD) ConnectHandlerD; + +alias int function(Dispatcher) IncomingHandlerF; +alias int function(Dispatcher) OutgoingHandlerF; +alias int function(Dispatcher, RegisterD) ErrorHandlerF; +alias int function(Dispatcher) DisconnectHandlerF; +alias int function(Conduit, RegisterD) ConnectHandlerF; + + +/****************************************************************************** + Dispatcher object. To be used by the SelectLoop to manage callbacks + for events. It may also be used to buffer data inbetween requests. + These can be populated passed to a SelectLoop directly by the end user, + or may be managed by a chosen Protocol. +******************************************************************************/ +class Dispatcher +{ + public + enum State { init, connected, listening, idle, closing }; + + /************************************************************************** + + Standard Ctor, takes a transport_ + + **************************************************************************/ + this (Conduit trans, bool listener = false) + { + transport_ = trans; + ibuf_len = 0; + i_offset = 0; + o_offset = 0; + out_buffers = new CircularSeq!(char[]); + log = Log.lookup("dreactor.core.Dispatcher"); + } + + /********************************************************************** + + Setters for the handlers. These are set by the Protocols as well + + **********************************************************************/ + + void setIncomingHandler(IncomingHandlerD hand) + { + inD = hand; + inF = null; + } + + void setIncomingHandler(IncomingHandlerF hand) + { + inF = hand; + inD = null; + } + + void setOutgoingHandler(OutgoingHandlerD hand) + { + outD = hand; + outF = null; + } + + void setOutgoingHandler(OutgoingHandlerF hand) + { + outF = hand; + outD = null; + } + + void setErrorHandler(ErrorHandlerD hand) + { + errD = hand; + errF = null; + } + + void setErrorHandler(ErrorHandlerF hand) + { + errF = hand; + errD = null; + } + + void setDisconnectHandler(DisconnectHandlerD hand) + { + disD = hand; + disF = null; + } + + void setDisconnectHandler(DisconnectHandlerF hand) + { + disF = hand; + disD = null; + } + + void setConnectHandler(ConnectHandlerD hand) + { + conD = hand; + conF = null; + } + + void setConnectHandler(ConnectHandlerF hand) + { + conF = hand; + conD = null; + } + + /********************************************************************** + + Handlers to be called by the SelectLoop when events occur + + **********************************************************************/ + int handleIncoming() + { + if (inD !is null) + return inD(this); + else if (inF !is null) + return inF(this); + else + throw new Exception("no Incoming handler set"); + } + + int handleOutgoing() + { + if (outD !is null) + return outD(this); + else if (outF !is null) + return outF(this); + else + throw new Exception("no Outgoing handler set"); + } + + int handleError(RegisterD reg) + { + if (errD !is null) + return errD(this, reg); + else if (errF !is null) + return errF(this, reg); + } + + int handleDisconnect() + { + if (disD !is null) + return disD(this); + else if (disF !is null) + return disF(this); + } + + int handleConnection(Conduit cond, RegisterD reg ) + { + if (conD !is null) + { + return conD(cond, reg); + } + else if (conF !is null) + { + return conF(cond, reg); + } + } + + /************************************************************************** + + Sending / Receiving helpers + + **************************************************************************/ + + /************************************************************************** + + appendOutBuffer + + Adds an outgoing buffer to the list. This returns true if the list + was empty, indicating that the handler should be registered with the + SelectLoop. If it returns false, it was probably already registered. + + **************************************************************************/ + synchronized bool appendOutBuffer(char[] outbuf) + { + out_buffers.append(outbuf); + out_buffers_len++; + if (out_buffers_len == 1) + return true; + else + return false; + } + + /************************************************************************** + + addOffset + Use this function to update the offset position after a successful data + send. This not only manages the current offset, but will update the + out buffer chain if necessary. + + Returns: false if there is nothing left to send, true if there is. + + **************************************************************************/ + synchronized bool addOffset(int off) + in + { + assert(out_buffers_len > 0); + } + body + { + char[] hd = out_buffers.head(); + if ((off + o_offset) >= hd.length) + { + out_buffers.removeHead(); + o_offset = 0; + out_buffers_len--; + return (out_buffers_len > 0); + } + else + o_offset += off; + return true; + } + + /************************************************************************** + + char[] nextBuffer + + Returns a slice of the current outbound buffer, returns a char[] pointing + to null if there is no current outbound buffer + + **************************************************************************/ + synchronized char[] nextBuffer() + { + if (out_buffers_len < 1) + { + return null; + } + + return out_buffers.head()[o_offset .. $]; + } + + /************************************************************************** + + listen + Enable listening on the socket attached to this connectionhandler + + **************************************************************************/ + int listen(IPv4Address addr) + { + (cast(AsyncSocketConduit)transport_).bind(addr).listen(); + state_ = State.listening; + return 0; + } + + Conduit transport() + { + return transport_; + } + /************************************************************************** + + Configuration functions + + **************************************************************************/ + Event events() + { + return events_; + } + void events(Event e) + { + events_ = e; + } + void addEvent(Event e) + { + events_ |= e; + } + void remEvent(Event e) + { + events_ &= ~e; + } + + State getState() {return state_;} + + /* + connection handlers are left out of this because + this method is used by the listener socket to pass + on its handlers to the accepted socket. An accepted + socket will generally do different things onConnection + */ + void setHandlers(Dispatcher other) + { + inD = other.inD; + outD = other.outD; + errD = other.errD; + disD = other.disD; + inF = other.inF; + outF = other.outF; + errF = other.errF; + disF = other.disF; + } + + /************************************************************************** + + Freelist allocators and deallocators + + **************************************************************************/ + static synchronized Dispatcher New(Conduit tran, Dispatcher other = null) + { + Dispatcher hand; + if (freelist) + { + hand = freelist; + freelist = hand.next; + hand.transport_ = tran; + } + else + hand = new Dispatcher(tran); + + if (!(other is null)) + { + hand.setHandlers(other); + } + return hand; + } + + static synchronized void Delete(Dispatcher hand) + { + hand.next = freelist; + freelist = hand.initialize(); + } + +private + + char[] in_buffer; + CircularSeq!(char[]) out_buffers; + int out_buffers_len; + int ibuf_len; + int i_offset; + int o_offset; + Logger log; + + package Conduit transport_; + State state_; + Event events_; + IncomingHandlerD inD; + OutgoingHandlerD outD; + ErrorHandlerD errD; + DisconnectHandlerD disD; + ConnectHandlerD conD; + + IncomingHandlerF inF; + OutgoingHandlerF outF; + ErrorHandlerF errF; + DisconnectHandlerF disF; + ConnectHandlerF conF; + + static Dispatcher freelist; + Dispatcher next; + + /************************************************************************** + Copy ctor, creates a new Dispatcher using the settings + of an existing handler. + **************************************************************************/ + Dispatcher initialize() + { + transport_ = null; + state_ = State.init; + ibuf_len = 0; + i_offset = 0; + o_offset = 0; + out_buffers.clear(); + inD = null; + outD = null; + errD = null; + disD = null; + conD = null; + inF = null; + outF = null; + errF = null; + disF = null; + conF = null; + return this; + } +} + diff -r f875a1f278b8 -r 287ba7de97c4 dreactor/core/Vat.d --- a/dreactor/core/Vat.d Tue Jul 08 12:16:07 2008 -0400 +++ b/dreactor/core/Vat.d Tue Jul 08 12:23:26 2008 -0400 @@ -21,7 +21,7 @@ import tango.util.log.Log; import dreactor.transport.AsyncSocketConduit; -import dreactor.core.ConnectionHandler; +import dreactor.core.Dispatcher; import dreactor.util.ThreadSafeQueue; Logger log; @@ -37,13 +37,13 @@ bool running; Atomic!(int) pending; - ThreadSafeQueue!(ConnectionHandler) freshList; - ThreadSafeQueue!(ConnectionHandler) remList; + ThreadSafeQueue!(Dispatcher) freshList; + ThreadSafeQueue!(Dispatcher) remList; public this() { - freshList = new ThreadSafeQueue!(ConnectionHandler); - remList = new ThreadSafeQueue!(ConnectionHandler); + freshList = new ThreadSafeQueue!(Dispatcher); + remList = new ThreadSafeQueue!(Dispatcher); log = Log.lookup("dreactor.core.Vat"); } @@ -59,13 +59,13 @@ running = false; } - bool addConnection(ConnectionHandler handler) + bool addConnection(Dispatcher handler) { log.trace("adding handler"); return freshList.push(handler); } - bool remConnection(ConnectionHandler handler) + bool remConnection(Dispatcher handler) { return remList.push(handler); } @@ -88,8 +88,8 @@ { // incoming data log.trace("Read event fired"); - auto conn = cast(ConnectionHandler) key.attachment; - if ( ConnectionHandler.State.listening == conn.getState() ) + auto conn = cast(Dispatcher) key.attachment; + if ( Dispatcher.State.listening == conn.getState() ) conn.handleConnection(conn.transport, &addConnection); else processReturn(conn.handleIncoming(), selector, conn); @@ -97,20 +97,20 @@ else if (key.isWritable()) { log.trace("Write event fired"); - auto conn = cast(ConnectionHandler) key.attachment; + auto conn = cast(Dispatcher) key.attachment; processReturn(conn.handleOutgoing(), selector, conn); } else if (key.isHangup()) { log.trace("Hangup event fired"); - auto conn = cast(ConnectionHandler) key.attachment; + auto conn = cast(Dispatcher) key.attachment; processReturn(conn.handleDisconnect(), selector, conn); } else if (key.isError() || key.isInvalidHandle()) { log.trace("Error event fired"); // error, close connection - auto conn = cast(ConnectionHandler) key.attachment; + auto conn = cast(Dispatcher) key.attachment; conn.handleError(&remConnection); } } @@ -124,12 +124,12 @@ log.error("Selector.select returned {}", eventCount); } //add Conduits to listener - freshList.processAll( (ref ConnectionHandler h) + freshList.processAll( (ref Dispatcher h) { selector.reregister(h.transport, h.events(), h); return 1; }); - remList.processAll( (ref ConnectionHandler h) + remList.processAll( (ref Dispatcher h) { selector.unregister(h.transport); return 1; @@ -139,7 +139,7 @@ } - void processReturn(int result, Selector s, ConnectionHandler h) + void processReturn(int result, Selector s, Dispatcher h) { switch(result) { diff -r f875a1f278b8 -r 287ba7de97c4 dreactor/protocol/RawTcp.d --- a/dreactor/protocol/RawTcp.d Tue Jul 08 12:16:07 2008 -0400 +++ b/dreactor/protocol/RawTcp.d Tue Jul 08 12:23:26 2008 -0400 @@ -9,7 +9,7 @@ import dreactor.transport.AsyncSocketConduit; import dreactor.core.Vat; -import dreactor.core.ConnectionHandler; +import dreactor.core.Dispatcher; /****************************************************************************** @@ -20,7 +20,7 @@ { public Logger log; - this(ConnectionHandler mgr, Vat sel, IPv4Address addr) + this(Dispatcher mgr, Vat sel, IPv4Address addr) { manager = mgr; mgr.events(Event.Read); @@ -33,14 +33,14 @@ vat = sel; log = Log.lookup("dreactor.protocol.RawTcpServer"); log.info("log initialized"); - children = new CircularSeq!(ConnectionHandler); + children = new CircularSeq!(Dispatcher); } int accept(Conduit cond, RegisterD reg) { AsyncSocketConduit newcond = new AsyncSocketConduit; (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket); - ConnectionHandler h = ConnectionHandler.New(newcond, manager); + Dispatcher h = Dispatcher.New(newcond, manager); h.events(Event.Read); vat.addConnection(h); children.append(h); @@ -50,7 +50,7 @@ int broadcast(char[] outbuf) { - foreach(ConnectionHandler h; children) + foreach(Dispatcher h; children) { if (h.appendOutBuffer(outbuf)) { @@ -67,16 +67,16 @@ } private - ConnectionHandler manager; + Dispatcher manager; Vat vat; - CircularSeq!(ConnectionHandler) children; + CircularSeq!(Dispatcher) children; } class RawTCPClient { public Logger log; - this(ConnectionHandler mgr, Vat sel, Event evts = Event.Read) + this(Dispatcher mgr, Vat sel, Event evts = Event.Read) { manager = mgr; manager.events(evts); @@ -130,7 +130,7 @@ } private - ConnectionHandler manager; + Dispatcher manager; Vat vat; bool connected; } @@ -152,7 +152,7 @@ if there is no more data left to send. ***************************************************************************/ -public static int onSend(ConnectionHandler h) +public static int onSend(Dispatcher h) { Logger log = Log.lookup("Handlers.onSend"); @@ -192,7 +192,7 @@ Default incoming data handler. Should be replaced with something useful. **************************************************************************/ -public static int onReceive(ConnectionHandler h) +public static int onReceive(Dispatcher h) { Logger log = Log.lookup("Handlers.onReceive"); diff -r f875a1f278b8 -r 287ba7de97c4 test/test Binary file test/test has changed diff -r f875a1f278b8 -r 287ba7de97c4 test/test.d --- a/test/test.d Tue Jul 08 12:16:07 2008 -0400 +++ b/test/test.d Tue Jul 08 12:23:26 2008 -0400 @@ -4,20 +4,20 @@ import tango.core.Thread; import tango.io.Stdout; import dreactor.core.Vat; -import dreactor.core.ConnectionHandler; +import dreactor.core.Dispatcher; import dreactor.protocol.RawTcp; import dreactor.transport.AsyncSocketConduit; int main() { AsyncSocketConduit cond = new AsyncSocketConduit; - ConnectionHandler lh = new ConnectionHandler(cond, true); + Dispatcher lh = new Dispatcher(cond, true); Vat l_vat = new Vat(); RawTCPListener listener = new RawTCPListener(lh, l_vat, new IPv4Address(5555)); l_vat.run(); AsyncSocketConduit clcond = new AsyncSocketConduit; - ConnectionHandler ch = new ConnectionHandler(clcond); + Dispatcher ch = new Dispatcher(clcond); Vat c_vat = new Vat(); RawTCPClient client = new RawTCPClient(ch, c_vat); c_vat.run();