# HG changeset patch # User rick@minifunk # Date 1215530469 14400 # Node ID e3dbc92088224db93e806a0cf90e1e25cd15c0c7 # Parent d3374d55398687b072f3f7458e62b989e74eb1b7 basic tests working diff -r d3374d553986 -r e3dbc9208822 dreactor/core/ConnectionHandler.d --- a/dreactor/core/ConnectionHandler.d Thu Jun 12 23:12:17 2008 -0400 +++ b/dreactor/core/ConnectionHandler.d Tue Jul 08 11:21:09 2008 -0400 @@ -1,24 +1,27 @@ module dreactor.core.ConnectionHandler; import tango.io.selector.model.ISelector; -import tango.net.Socket.Address; import tango.util.collection.CircularSeq; +import tango.net.Socket; +public import tango.core.Exception; +import dreactor.transport.AsyncSocketConduit; -import dreactor.transport.AsyncSocketConduit; +import tango.util.log.Log; +import tango.util.log.Config; alias bool delegate(ConnectionHandler) RegisterD; -alias bool delegate(ConnectionHandler, RegisterD) IncomingHandlerD; -alias bool delegate(ConnectionHandler, RegisterD) OutgoingHandlerD; +alias int delegate(ConnectionHandler) IncomingHandlerD; +alias int delegate(ConnectionHandler) OutgoingHandlerD; alias int delegate(ConnectionHandler, RegisterD) ErrorHandlerD; -alias bool delegate(ConnectionHandler, RegisterD) DisconnectHandlerD; -alias int delegate(Conduit, RegisterD) ConnectHandlerD; +alias int delegate(ConnectionHandler) DisconnectHandlerD; +alias int delegate(Conduit, RegisterD) ConnectHandlerD; -alias bool function(ConnectionHandler, RegisterD) IncomingHandlerF; -alias bool function(ConnectionHandler, RegisterD) OutgoingHandlerF; +alias int function(ConnectionHandler) IncomingHandlerF; +alias int function(ConnectionHandler) OutgoingHandlerF; alias int function(ConnectionHandler, RegisterD) ErrorHandlerF; -alias bool function(ConnectionHandler, RegisterD) DisconnectHandlerF; -alias int function(Conduit, RegisterD) ConnectHandlerF; +alias int function(ConnectionHandler) DisconnectHandlerF; +alias int function(Conduit, RegisterD) ConnectHandlerF; /****************************************************************************** @@ -27,23 +30,24 @@ These can be populated passed to a SelectLoop directly by the end user, or may be managed by a chosen Protocol. ******************************************************************************/ -class ConnectionHandler() +class ConnectionHandler { public - enum { init, connected, listening, idle, closing }; + enum State { init, connected, listening, idle, closing }; /************************************************************************** - Standard Ctor, takes a transport + Standard Ctor, takes a transport_ **************************************************************************/ this (Conduit trans, bool listener = false) { - transport = trans; + transport_ = trans; ibuf_len = 0; - obuf_len = 0; i_offset = 0; o_offset = 0; + out_buffers = new CircularSeq!(char[]); + log = Log.lookup("dreactor.core.ConnectionHandler"); } /********************************************************************** @@ -117,47 +121,51 @@ Handlers to be called by the SelectLoop when events occur **********************************************************************/ - void handleIncoming(Conduit cond) + int handleIncoming() { - if (!inD is null) - return inD(cond); - else if (!inF is null) - return inF(cond); + if (inD !is null) + return inD(this); + else if (inF !is null) + return inF(this); + else + throw new Exception("no Incoming handler set"); } - void handleOutgoing(Conduit cond) + int handleOutgoing() { - if (!outD is null) - return outD(cond); - else if (!outF is null) - return outF(cond); + if (outD !is null) + return outD(this); + else if (outF !is null) + return outF(this); + else + throw new Exception("no Outgoing handler set"); } - int handleError(Conduit cond) + int handleError(RegisterD reg) { - if (!errD is null) - return errD(cond); - else if (!errF is null) - return errF(cond); + if (errD !is null) + return errD(this, reg); + else if (errF !is null) + return errF(this, reg); } - int handleDisconnect(Conduit cond) + int handleDisconnect() { - if (!disD is null) - return disD(addr); - else if (!disF is null) - return disF(addr); + if (disD !is null) + return disD(this); + else if (disF !is null) + return disF(this); } - int handleConnection(Address addr) + int handleConnection(Conduit cond, RegisterD reg ) { - if (!conD is null) + if (conD !is null) { - return conD(addr); + return conD(cond, reg); } - else if (!conF is null) + else if (conF !is null) { - return conF(addr); + return conF(cond, reg); } } @@ -176,7 +184,7 @@ SelectLoop. If it returns false, it was probably already registered. **************************************************************************/ - synchronized void appendOutBuffer(char[] outbuf) + synchronized bool appendOutBuffer(char[] outbuf) { out_buffers.append(outbuf); out_buffers_len++; @@ -242,12 +250,15 @@ **************************************************************************/ int listen(IPv4Address addr) { - transport.bind().listen(); - state = listening; - setConnectionHandler() + (cast(AsyncSocketConduit)transport_).bind(addr).listen(); + state_ = State.listening; + return 0; } - + Conduit transport() + { + return transport_; + } /************************************************************************** Configuration functions @@ -263,14 +274,16 @@ } void addEvent(Event e) { + log.trace("events_ before: {}", events_); events_ |= e; + log.trace("events_ after: {}", events_); } void remEvent(Event e) { - events_ ^= e; + events_ &= ~e; } - State getState() {return state;} + State getState() {return state_;} /* connection handlers are left out of this because @@ -302,12 +315,12 @@ { hand = freelist; freelist = hand.next; - hand.transport = tran; + hand.transport_ = tran; } else hand = new ConnectionHandler(tran); - if (!other is null) + if (!(other is null)) { hand.setHandlers(other); } @@ -317,20 +330,22 @@ static synchronized void Delete(ConnectionHandler hand) { hand.next = freelist; - freelist = hand.init(); + freelist = hand.initialize(); } private - - char[SZ] in_buffer; + + char[] in_buffer; CircularSeq!(char[]) out_buffers; + int out_buffers_len; int ibuf_len; int i_offset; int o_offset; + Logger log; - package Conduit transport; - State state; - Event events; + package Conduit transport_; + State state_; + Event events_; IncomingHandlerD inD; OutgoingHandlerD outD; ErrorHandlerD errD; @@ -350,15 +365,14 @@ Copy ctor, creates a new ConnectionHandler using the settings of an existing handler. **************************************************************************/ - void init() + ConnectionHandler initialize() { - transport = null; - state = State.init; + transport_ = null; + state_ = State.init; ibuf_len = 0; - obuf_len = 0; i_offset = 0; o_offset = 0; - out_buffer = null; + out_buffers.clear(); inD = null; outD = null; errD = null; @@ -369,6 +383,7 @@ errF = null; disF = null; conF = null; + return this; } } diff -r d3374d553986 -r e3dbc9208822 dreactor/core/SelectLoop.d --- a/dreactor/core/SelectLoop.d Thu Jun 12 23:12:17 2008 -0400 +++ b/dreactor/core/SelectLoop.d Tue Jul 08 11:21:09 2008 -0400 @@ -12,7 +12,6 @@ module dreactor.core.SelectLoop; -import dreactor.transport.AsyncSocketConduit; import tango.io.selector.Selector; import tango.io.selector.model.ISelector; import tango.core.Exception; @@ -20,11 +19,14 @@ import tango.core.Atomic; import tango.util.collection.LinkSeq; import tango.util.log.Log; -import tango.util.log.Configurator; -import dreactor.core.ConnectionHandler; +import dreactor.transport.AsyncSocketConduit; +import dreactor.core.ConnectionHandler; +import dreactor.util.ThreadSafeQueue; -Logger log = Log.getLogger("dreactor.core.SelectLoop"); +Logger log; + +enum : int {UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; static char[] version_string = "SelectLoop.d 0.1 2008-05-31"; @@ -34,16 +36,15 @@ Thread thread; bool running; Atomic!(int) pending; - + ThreadSafeQueue!(ConnectionHandler) freshList; - //LinkSeq!(ConnectionHandler) connList; probably not necessary - -public + ThreadSafeQueue!(ConnectionHandler) remList; +public this() { freshList = new ThreadSafeQueue!(ConnectionHandler); - //connList = new LinkSeq!(ConnectionHandler); - Configurator(); + remList = new ThreadSafeQueue!(ConnectionHandler); + log = Log.lookup("dreactor.core.SelectLoop"); } void run() @@ -60,18 +61,23 @@ bool addConnection(ConnectionHandler handler) { + log.trace("adding handler"); return freshList.push(handler); } + + bool remConnection(ConnectionHandler handler) + { + return remList.push(handler); + } private void eventLoop() { auto selector = new Selector(); selector.open(); - auto format = Log.format; do { - auto eventCount = selector.select(10); + auto eventCount = selector.select(0.01); if (eventCount > 0) { @@ -81,30 +87,30 @@ if (key.isReadable()) { // incoming data + log.trace("Read event fired"); auto conn = cast(ConnectionHandler) key.attachment; - if ( ConnectionHandler.listening == conn.getState() ) - conn.handleConnection(conn.transport, addConnection); - else if ( ! conn.handleIncoming(conn.transport)) - unregister(conn.transport); + if ( ConnectionHandler.State.listening == conn.getState() ) + conn.handleConnection(conn.transport, &addConnection); + else + processReturn(conn.handleIncoming(), selector, conn); } else if (key.isWritable()) { + log.trace("Write event fired"); auto conn = cast(ConnectionHandler) key.attachment; - if ( ! conn.handleOutgoing(conn.transport)) - unregister(conn.transport); + processReturn(conn.handleOutgoing(), selector, conn); } else if (key.isHangup()) { auto conn = cast(ConnectionHandler) key.attachment; - if ( ! conn.handleDisconnect(conn.transport)) - unregister(conn.transport); + processReturn(conn.handleDisconnect(), selector, conn); } else if (key.isError() || key.isInvalidHandle()) { + log.trace("Error event fired"); // error, close connection auto conn = cast(ConnectionHandler) key.attachment; - if ( ! conn.handleError(conn.transport, key.isInvalidHandle())) - selector.unregister(conn.transport); + conn.handleError(&remConnection); } } } @@ -114,18 +120,44 @@ } else { - log.error(format("Selector.select returned {}", eventCount)); + log.error("Selector.select returned {}", eventCount); } - //add Conduits to listener - freshList.processAll( (ConnectionHandler h) + freshList.processAll( (ref ConnectionHandler h) { - selector.register(conn.transport, conn.events()); - //connList.append(conn); + log.trace("reregistering transport for event {}", h.events()); + selector.reregister(h.transport, h.events(), h); return 1; }); - //freshList.processAll(reg); + remList.processAll( (ref ConnectionHandler h) + { + selector.unregister(h.transport); + return 1; + }); } while (running) + + log.trace("done with while loop"); + } + + void processReturn(int result, Selector s, ConnectionHandler h) + { + switch(result) + { + case UNREGISTER: + s.unregister(h.transport); + break; + case REMAIN: + //this space intentially left blank + break; + case REGISTER: + s.register(h.transport, h.events(), h); + break; + case REREGISTER: + s.reregister(h.transport, h.events(), h); + break; + default: + log.error("unknown return value"); + } } } diff -r d3374d553986 -r e3dbc9208822 dreactor/transport/AsyncSocketConduit.d --- a/dreactor/transport/AsyncSocketConduit.d Thu Jun 12 23:12:17 2008 -0400 +++ b/dreactor/transport/AsyncSocketConduit.d Tue Jul 08 11:21:09 2008 -0400 @@ -20,8 +20,6 @@ private import tango.net.Socket; -import tango.net.Socket.Address; - /******************************************************************************* A wrapper around the bare Socket to implement the IConduit abstraction @@ -33,23 +31,16 @@ preferred *******************************************************************************/ -alias IPv4Address Address; class AsyncSocketConduit : Conduit { - package Socket socket_; + package Socket socket_; /*********************************************************************** Create a streaming Internet Socket ***********************************************************************/ - /* overriding the enum from the IConduit interface */ - enum : uint - { - Eof = uint.max, /// the End-of-Flow identifer - Err = uint.max -1 // some error ocurred, Should disconnect - } this () { @@ -65,7 +56,8 @@ protected this (SocketType type, ProtocolType protocol, bool create=true) { - socket_ = new Socket (AddressFamily.INET, type, protocol, create); + socket_ = new Socket (AddressFamily.INET, type, protocol, create); + socket_.blocking(false); } /*********************************************************************** @@ -159,9 +151,9 @@ Enable the socket for listening **************************************************************************/ - AsyncSocketConduit listen() + AsyncSocketConduit listen(int backlog = 255) { - socket_.listen(); + socket_.listen(backlog); return this; } @@ -202,11 +194,6 @@ override void detach () { socket_.detach; - - // deallocate if this came from the free-list, - // otherwise just wait for the GC to handle it - if (fromList) - deallocate (this); } /*********************************************************************** @@ -215,58 +202,26 @@ Returns the number of bytes read from the socket, or IConduit.Eof where there's no more content available - - Note that a timeout is equivalent to Eof. Isolating - a timeout condition can be achieved via hadTimeout() - - Note also that a zero return value is not legitimate; - such a value indicates Eof + + Return IConduit.Eof if there is an error with the socket. ***********************************************************************/ - override uint read (void[] dst) { - return read (dst, (void[] dst){return socket_.receive(dst);}); + // invoke the actual read op + return socket_.receive(dst); } + /*********************************************************************** Callback routine to write the provided content to the - socket. This will stall until the socket responds in - some manner. Returns the number of bytes sent to the - output, or IConduit.Eof if the socket cannot write. - + socket. ***********************************************************************/ override uint write (void[] src) { - int count = socket_.send (src); - if (count == 0) - count = Eof; - else if (count < 0) - count = Err; - return count; + return socket_.send (src); } - - /*********************************************************************** - - Internal routine to handle socket read under a timeout. - Note that this is synchronized, in order to serialize - socket access +} - ***********************************************************************/ - - package final uint read (void[] dst, int delegate(void[]) dg) - { - // invoke the actual read op - int count = dg (dst); - if (count == 0) - return Eof; - else if (count < 0) - return Err; - - return count; - } - - } - diff -r d3374d553986 -r e3dbc9208822 dreactor/util/ThreadSafeQueue.d --- a/dreactor/util/ThreadSafeQueue.d Thu Jun 12 23:12:17 2008 -0400 +++ b/dreactor/util/ThreadSafeQueue.d Tue Jul 08 11:21:09 2008 -0400 @@ -1,7 +1,10 @@ module dreactor.util.ThreadSafeQueue; import tango.util.collection.CircularSeq; +import tango.core.Atomic; +import tango.util.log.Log; +import tango.util.log.Config; /****************************************************************************** ThreadSafeQueue @@ -15,18 +18,19 @@ public this(int maxsz = 1000) { - list = new CircularSeq!(TYPE); - maxsize = maxsz; - size = 0; + list_ = new CircularSeq!(TYPE); + maxsize_ = maxsz; + size_ = 0; + log = Log.lookup("dreactor.util.ThreadSafeQueue"); } synchronized bool pop(ref TYPE t) { - if (list.size()) + if (size_ > 0) { - TYPE t = list.head(); - list.removeHead(); - --size; + t = list_.head(); + list_.removeHead(); + size_--; return true; } else @@ -35,10 +39,10 @@ synchronized bool push(TYPE t) { - if (list.size < maxsize) + if (size_ < maxsize_) { - list.append(t); - ++size; + list_.append(t); + size_++; return true; } else @@ -47,22 +51,31 @@ synchronized int size() { - return size(); + return size_; } - synchronized int processAll(int delegate(ref T value) dg) + synchronized int processAll(int delegate(ref TYPE value) dg) { + if (0 >= size_) + return 0; + int count = 0; - foreach(T t; list) + foreach(TYPE t; list_) { if (dg(t) < 0) break; ++count; } - if (count == size) + if (count == size_) + { clear_(); + size_ = 0; + } else - list.removeRange(0, count); + { + list_.removeRange(0, count); + size_ -= count; + } return count; } @@ -75,10 +88,12 @@ void clear_() { - list.removeAll(); + list_.clear(); + size_ = 0 ; } - CircularSeq!(TYPE) list; - int maxsize; - int size; + int maxsize_; + int size_; + Logger log; + CircularSeq!(TYPE) list_; } diff -r d3374d553986 -r e3dbc9208822 dsss.conf --- a/dsss.conf Thu Jun 12 23:12:17 2008 -0400 +++ b/dsss.conf Tue Jul 08 11:21:09 2008 -0400 @@ -1,2 +1,3 @@ [test/test.d] +buildFlags=-debug -gc