Mercurial > projects > dreactor
diff asyncdreactor/core/Dispatcher.d @ 11:5836613d16ac
reorg! reorg!
author | rick@minifunk |
---|---|
date | Tue, 12 Aug 2008 16:59:56 -0400 |
parents | dreactor/core/Dispatcher.d@5412a1ff2e49 |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/asyncdreactor/core/Dispatcher.d Tue Aug 12 16:59:56 2008 -0400 @@ -0,0 +1,387 @@ +module dreactor.core.Dispatcher; + +import tango.io.selector.model.ISelector; +import tango.util.collection.CircularSeq; +import tango.net.Socket; +public import tango.core.Exception; +import dreactor.transport.AsyncSocketConduit; + +import tango.util.log.Log; +import tango.util.log.Config; + +alias bool delegate(Dispatcher) RegisterD; + +alias int delegate(Dispatcher) IncomingHandlerD; +alias int delegate(Dispatcher) OutgoingHandlerD; +alias int delegate(Dispatcher, RegisterD) ErrorHandlerD; +alias int delegate(Dispatcher) DisconnectHandlerD; +alias int delegate(Conduit, RegisterD) ConnectHandlerD; + +alias int function(Dispatcher) IncomingHandlerF; +alias int function(Dispatcher) OutgoingHandlerF; +alias int function(Dispatcher, RegisterD) ErrorHandlerF; +alias int function(Dispatcher) DisconnectHandlerF; +alias int function(Conduit, RegisterD) ConnectHandlerF; + + +/****************************************************************************** + Dispatcher object. To be used by the SelectLoop to manage callbacks + for events. It may also be used to buffer data inbetween requests. + These can be populated passed to a SelectLoop directly by the end user, + or may be managed by a chosen Protocol. +******************************************************************************/ +class Dispatcher +{ + public + enum State { init, connected, listening, idle, closing }; + + /************************************************************************** + + Standard Ctor, takes a transport_ + + **************************************************************************/ + this (Conduit trans, bool listener = false) + { + transport_ = trans; + ibuf_len = 0; + i_offset = 0; + o_offset = 0; + out_buffers = new CircularSeq!(char[]); + log = Log.lookup("dreactor.core.Dispatcher"); + } + + /********************************************************************** + + Setters for the handlers. These are set by the Protocols as well + + **********************************************************************/ + + void setIncomingHandler(IncomingHandlerD hand) + { + inD = hand; + inF = null; + } + + void setIncomingHandler(IncomingHandlerF hand) + { + inF = hand; + inD = null; + } + + void setOutgoingHandler(OutgoingHandlerD hand) + { + outD = hand; + outF = null; + } + + void setOutgoingHandler(OutgoingHandlerF hand) + { + outF = hand; + outD = null; + } + + void setErrorHandler(ErrorHandlerD hand) + { + errD = hand; + errF = null; + } + + void setErrorHandler(ErrorHandlerF hand) + { + errF = hand; + errD = null; + } + + void setDisconnectHandler(DisconnectHandlerD hand) + { + disD = hand; + disF = null; + } + + void setDisconnectHandler(DisconnectHandlerF hand) + { + disF = hand; + disD = null; + } + + void setConnectHandler(ConnectHandlerD hand) + { + conD = hand; + conF = null; + } + + void setConnectHandler(ConnectHandlerF hand) + { + conF = hand; + conD = null; + } + + /********************************************************************** + + Handlers to be called by the SelectLoop when events occur + + **********************************************************************/ + int handleIncoming() + { + if (inD !is null) + return inD(this); + else if (inF !is null) + return inF(this); + else + throw new Exception("no Incoming handler set"); + } + + int handleOutgoing() + { + if (outD !is null) + return outD(this); + else if (outF !is null) + return outF(this); + else + throw new Exception("no Outgoing handler set"); + } + + int handleError(RegisterD reg) + { + if (errD !is null) + return errD(this, reg); + else if (errF !is null) + return errF(this, reg); + } + + int handleDisconnect() + { + if (disD !is null) + return disD(this); + else if (disF !is null) + return disF(this); + } + + int handleConnection(Conduit cond, RegisterD reg ) + { + if (conD !is null) + { + return conD(cond, reg); + } + else if (conF !is null) + { + return conF(cond, reg); + } + } + + /************************************************************************** + + Sending / Receiving helpers + + **************************************************************************/ + + /************************************************************************** + + appendOutBuffer + + Adds an outgoing buffer to the list. This returns true if the list + was empty, indicating that the handler should be registered with the + SelectLoop. If it returns false, it was probably already registered. + + **************************************************************************/ + synchronized bool appendOutBuffer(char[] outbuf) + { + out_buffers.append(outbuf); + out_buffers_len++; + if (out_buffers_len == 1) + return true; + else + return false; + } + + /************************************************************************** + + addOffset + Use this function to update the offset position after a successful data + send. This not only manages the current offset, but will update the + out buffer chain if necessary. + + Returns: false if there is nothing left to send, true if there is. + + **************************************************************************/ + synchronized bool addOffset(int off) + in + { + assert(out_buffers_len > 0); + } + body + { + char[] hd = out_buffers.head(); + if ((off + o_offset) >= hd.length) + { + out_buffers.removeHead(); + o_offset = 0; + out_buffers_len--; + return (out_buffers_len > 0); + } + else + o_offset += off; + return true; + } + + /************************************************************************** + + char[] nextBuffer + + Returns a slice of the current outbound buffer, returns a char[] pointing + to null if there is no current outbound buffer + + **************************************************************************/ + synchronized char[] nextBuffer() + { + if (out_buffers_len < 1) + { + return null; + } + + return out_buffers.head()[o_offset .. $]; + } + + /************************************************************************** + + listen + Enable listening on the socket attached to this connectionhandler + + **************************************************************************/ + int listen(IPv4Address addr) + { + (cast(AsyncSocketConduit)transport_).bind(addr).listen(); + state_ = State.listening; + return 0; + } + + Conduit transport() + { + return transport_; + } + /************************************************************************** + + Configuration functions + + **************************************************************************/ + Event events() + { + return events_; + } + void events(Event e) + { + events_ = e; + } + void addEvent(Event e) + { + events_ |= e; + } + void remEvent(Event e) + { + events_ &= ~e; + } + + State getState() {return state_;} + + /* + connection handlers are left out of this because + this method is used by the listener socket to pass + on its handlers to the accepted socket. An accepted + socket will generally do different things onConnection + */ + void setHandlers(Dispatcher other) + { + inD = other.inD; + outD = other.outD; + errD = other.errD; + disD = other.disD; + inF = other.inF; + outF = other.outF; + errF = other.errF; + disF = other.disF; + } + + /************************************************************************** + + Freelist allocators and deallocators + + **************************************************************************/ + static synchronized Dispatcher New(Conduit tran, Dispatcher other = null) + { + Dispatcher hand; + if (freelist) + { + hand = freelist; + freelist = hand.next; + hand.transport_ = tran; + } + else + hand = new Dispatcher(tran); + + if (!(other is null)) + { + hand.setHandlers(other); + } + return hand; + } + + static synchronized void Delete(Dispatcher hand) + { + hand.next = freelist; + freelist = hand.initialize(); + } + +private + + char[] in_buffer; + CircularSeq!(char[]) out_buffers; + int out_buffers_len; + int ibuf_len; + int i_offset; + int o_offset; + Logger log; + + package Conduit transport_; + State state_; + Event events_; + IncomingHandlerD inD; + OutgoingHandlerD outD; + ErrorHandlerD errD; + DisconnectHandlerD disD; + ConnectHandlerD conD; + + IncomingHandlerF inF; + OutgoingHandlerF outF; + ErrorHandlerF errF; + DisconnectHandlerF disF; + ConnectHandlerF conF; + + static Dispatcher freelist; + Dispatcher next; + + /************************************************************************** + Copy ctor, creates a new Dispatcher using the settings + of an existing handler. + **************************************************************************/ + Dispatcher initialize() + { + transport_ = null; + state_ = State.init; + ibuf_len = 0; + i_offset = 0; + o_offset = 0; + out_buffers.clear(); + inD = null; + outD = null; + errD = null; + disD = null; + conD = null; + inF = null; + outF = null; + errF = null; + disF = null; + conF = null; + return this; + } +} +