changeset 0:7a315154bf5e

Initial commit
author rick@minifunk
date Sun, 08 Jun 2008 01:45:38 -0400
parents
children b5c7dc3922c6
files .hgignore dreactor/core/ConnectionHandler.d dreactor/core/SelectLoop.d dreactor/protocol/Http11.d dreactor/protocol/Raw.d dreactor/transport/AsyncSocketConduit.d dreactor/util/ThreadSafeQueue.d dsss.conf test/dummy.txt
diffstat 8 files changed, 976 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/.hgignore	Sun Jun 08 01:45:38 2008 -0400
@@ -0,0 +1,8 @@
+syntax: glob
+
+dsss_imports
+dsss_imports/*
+dsss_objects
+dsss_objects/*
+dsss.last
+*.swp
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/core/ConnectionHandler.d	Sun Jun 08 01:45:38 2008 -0400
@@ -0,0 +1,362 @@
+module dreactor.core.ConnectionHandler;
+
+import tango.io.selector.model.ISelector;
+import tango.net.Socket.Address;
+import tango.util.collection.CircularSeq;
+
+import dreactor.transport.AsyncSocketConduit;
+
+alias bool delegate(ConnectionHandler) RegisterD;
+
+alias bool delegate(ConnectionHandler, RegisterD)   IncomingHandlerD;
+alias bool delegate(ConnectionHandler, RegisterD)   OutgoingHandlerD;
+alias int delegate(ConnectionHandler, RegisterD)    ErrorHandlerD;
+alias bool delegate(ConnectionHandler, RegisterD)   DisconnectHandlerD;
+alias int delegate(Conduit, RegisterD)              ConnectHandlerD;
+
+alias bool function(ConnectionHandler, RegisterD)   IncomingHandlerF;
+alias bool function(ConnectionHandler, RegisterD)   OutgoingHandlerF;
+alias int function(ConnectionHandler, RegisterD)    ErrorHandlerF;
+alias bool function(ConnectionHandler, RegisterD)   DisconnectHandlerF;
+alias int function(Conduit, RegisterD)              ConnectHandlerF;
+
+
+/******************************************************************************
+    ConnectionHandler object. To be used by the SelectLoop to manage callbacks 
+    for events. It may also be used to buffer data inbetween requests.
+    These can be populated passed to a SelectLoop directly by the end user, 
+    or may be managed by a chosen Protocol. 
+******************************************************************************/
+class ConnectionHandler()
+{
+ public 
+    enum { init, connected, listening, idle,  closing };
+  
+    /**************************************************************************
+
+        Standard Ctor, takes a transport
+
+    **************************************************************************/ 
+    this (Conduit trans, bool islistener = false)
+    {
+        state = State.init;
+        transport = trans;
+        ibuf_len = 0;
+        obuf_len = 0;
+        i_offset = 0;
+        o_offset = 0;
+        isListener = islistener;
+    }
+    
+      
+    /**********************************************************************
+    
+        Setters for the handlers. These are set by the Protocols as well 
+    
+    **********************************************************************/
+    
+    void setIncomingHandler(IncomingHandlerD hand)
+    {
+        inD = hand;
+        inF = null;
+    }
+
+    void setIncomingHandler(IncomingHandlerF hand)
+    {
+        inF = hand;
+        inD = null;
+    }
+
+    void setOutgoingHandler(OutgoingHandlerD hand)
+    {
+        outD = hand;
+        outF = null;
+    }
+
+    void setOutgoingHandler(OutgoingHandlerF hand)
+    {
+        outF = hand;
+        outD = null;
+    }
+
+    void setErrorHandler(ErrorHandlerD hand)
+    {
+        errD = hand;
+        errF = null;
+    }
+
+    void setErrorHandler(ErrorHandlerF hand)
+    {
+        errF = hand;
+        errD = null;
+    }
+
+    void setDisconnectHandler(DisconnectHandlerD hand)
+    {
+        disD = hand;
+        disF = null;
+    }
+
+    void setDisconnectHandler(DisconnectHandlerF hand)
+    {
+        disF = hand;
+        disD = null;
+    }
+
+    void setConnectHandler(ConnectHandlerD hand)
+    {
+        conD = hand;
+        conF = null;
+    }
+
+    void setConnectHandler(ConnectHandlerF hand)
+    {
+        conF = hand;
+        conD = null;
+    }
+
+    /**********************************************************************
+        
+        Handlers to be called by the SelectLoop when events occur
+    
+    **********************************************************************/
+    void handleIncoming(Conduit cond)
+    {
+        if (!inD is null)
+            return inD(cond);
+        else if (!inF is null)
+            return inF(cond);
+    }
+
+    void handleOutgoing(Conduit cond)
+    {
+        if (!outD is null)
+            return outD(cond);
+        else if (!outF is null)
+            return outF(cond);
+    }
+
+    int handleError(Conduit cond)
+    {
+        if (!errD is null)
+            return errD(cond);
+        else if (!errF is null)
+            return errF(cond);
+    }
+
+    int handleDisconnect(Conduit cond)
+    {
+        if (!disD is null)
+            return disD(addr);
+        else if (!disF is null)
+            return disF(addr);
+    }
+
+    int handleConnection(Address addr)
+    {
+        if (!conD is null)
+        {
+            return conD(addr);
+        }
+        else if (!conF is null)
+        {
+            return conF(addr);
+        }
+    }
+
+    /**************************************************************************
+
+        Sending / Receiving helpers
+
+    **************************************************************************/
+
+    /**************************************************************************
+    
+        appendOutBuffer
+
+        Adds an outgoing buffer to the list. This returns true if the list
+        was empty, indicating that the handler should be registered with the
+        SelectLoop. If it returns false, it was probably already registered.
+        
+    **************************************************************************/
+    synchronized void appendOutBuffer(char[] outbuf)
+    {
+        out_buffers.append(outbuf);
+        out_buffers_len++;
+        if (out_buffers_len == 1)
+            return true;
+        else
+            return false;
+    }
+   
+    /**************************************************************************
+
+        addOffset 
+        Use this function to update the offset position after a successful data
+        send. This not only manages the current offset, but will update the 
+        out buffer chain if necessary. 
+
+        Returns: false if there is nothing left to send, true if there is.
+
+    **************************************************************************/ 
+    synchronized bool addOffset(int off)
+    in
+    {
+        assert(out_buffers_len > 0);
+    }
+    body
+    {
+        char[] hd = out_buffers.head();
+        if ((off + o_offset) >= hd.length)
+        {
+            out_buffers.removeHead();
+            o_offset = 0;
+            out_buffers_len--;
+            return (out_buffers_len > 0);
+        }
+        else
+            o_offset += off;
+        return true;
+    }
+
+    /**************************************************************************
+
+        char[] nextBuffer
+
+        Returns a slice of the current outbound buffer, returns a char[] pointing
+        to null if there is no current outbound buffer
+
+    **************************************************************************/
+    synchronized char[] nextBuffer()
+    {
+        if (out_buffers_len < 1)
+        {
+            return null; 
+        }
+
+        return out_buffers.head()[o_offset .. $];
+    }
+    /**************************************************************************
+
+        Configuration functions
+
+    **************************************************************************/
+    Event events()
+    {
+        return events_;
+    }
+    void events(Event e)
+    {
+        events_ = e;
+    }
+    void addEvent(Event e)
+    {
+        events_ |= e;
+    }
+    void remEvent(Event e)
+    {
+        events_ &= ^e;
+    }
+
+    State getState() {return state;}
+    
+    /*
+       connection handlers are left out of this because 
+       this method is used by the listener socket to pass 
+       on its handlers to the accepted socket. An accepted
+       socket will generally do different things onConnection
+    */
+    void setHandlers(ConnectionHandler other)
+    {
+        inD  = other.inD; 
+        outD = other.outD;
+        errD = other.errD;
+        disD = other.disD;
+        inF  = other.inF;
+        outF = other.outF;
+        errF = other.errF;
+        disF = other.disF;
+    }
+
+    /**************************************************************************
+    
+        Freelist allocators and deallocators
+
+    **************************************************************************/
+    static synchronized ConnectionHandler New(Conduit tran, ConnectionHandler other = null)
+    {
+        ConnectionHandler hand;
+	    if (freelist)
+	    {   
+            hand = freelist;
+	        freelist = hand.next;
+            hand.transport = tran;
+	    }
+	    else
+	        hand = new ConnectionHandler(tran);
+
+        if (!other is null)
+        {
+            hand.setHandlers(other);
+        }
+	    return hand;
+    }
+
+    static synchronized void Delete(ConnectionHandler hand)
+    {   
+        hand.next = freelist;
+        freelist = hand.init();
+    } 
+
+private
+    
+    char[SZ] in_buffer;
+    CircularSeq!(char[]) out_buffers;
+    int ibuf_len;
+    int i_offset;
+    int o_offset;
+
+    package Conduit transport;
+    State state;
+    Event events; 
+    IncomingHandlerD    inD;
+    OutgoingHandlerD    outD;
+    ErrorHandlerD       errD;
+    DisconnectHandlerD  disD;
+    ConnectHandlerD     conD;
+
+    IncomingHandlerF    inF;
+    OutgoingHandlerF    outF;
+    ErrorHandlerF       errF;
+    DisconnectHandlerF  disF;
+    ConnectHandlerF     conF;
+    
+    static ConnectionHandler freelist;
+    ConnectionHandler next;
+
+    /**************************************************************************
+        Copy ctor, creates a new ConnectionHandler using the settings
+        of an existing handler. 
+    **************************************************************************/ 
+    void init()
+    {
+        transport = null;
+        state = State.init;
+        ibuf_len = 0;
+        obuf_len = 0;
+        i_offset = 0;
+        o_offset = 0;
+        out_buffer = null;
+        inD  = null;
+        outD = null;
+        errD = null;
+        disD = null;
+        conD = null;
+        inF  = null;
+        outF = null;
+        errF = null;
+        disF = null;
+        conF = null;
+    }
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/core/SelectLoop.d	Sun Jun 08 01:45:38 2008 -0400
@@ -0,0 +1,133 @@
+/*******************************************************************************
+
+        copyright:      Copyright (c) 2008 Rick Richardson. All rights reserved
+
+        license:        BSD style: $(LICENSE)
+
+        version:        Initial release v0.1 : May 2008
+        
+        author:         Rick Richardson
+
+*******************************************************************************/
+
+
+
+module dreactor.core.SelectLoop;
+
+import dreactor.transport.AsyncSocketConduit;
+import tango.io.selector.Selector;
+import tango.io.selector.model.ISelector;
+import tango.core.Exception;
+import tango.core.Thread;
+import tango.core.Atomic;
+import tango.util.collection.LinkSeq;
+import tango.util.log.Log;
+import tango.util.log.Configurator;
+
+import dreactor.core.ConnectionHandler; 
+
+Logger log = Log.getLogger("dreactor.core.SelectLoop");
+
+static char[] version_string = "SelectLoop.d 0.1 2008-05-31";
+
+class SelectLoop
+{
+private
+    Thread thread;
+    bool running;
+    Atomic!(int) pending;
+   
+    ThreadSafeQueue!(ConnectionHandler) freshList; 
+    //LinkSeq!(ConnectionHandler) connList;  probably not necessary
+
+public    
+    this()
+    {
+        freshList = new ThreadSafeQueue!(ConnectionHandler);
+        //connList = new LinkSeq!(ConnectionHandler);
+        Configurator();
+    }
+
+    void run()
+    {
+        running = true;
+        thread = new Thread(&eventLoop);
+        thread.start();
+    }
+
+    void exit()
+    {
+        running = false;
+    }
+
+    bool addConnection(ConnectionHandler handler)
+    {
+        return freshList.push(handler);       
+    }
+
+private
+    void eventLoop()
+    {
+        auto selector = new Selector();
+        selector.open();
+        auto format = Log.format;         
+        do
+        {
+            auto eventCount = selector.select(10);
+
+            if (eventCount > 0)
+            {
+                // process events
+                foreach (SelectionKey key; selector.selectedSet())
+                {
+                    if (key.isReadable())
+                    {
+                        // incoming data
+                        auto conn = cast(ConnectionHandler) key.attachment;
+                        if ( ConnectionHandler.listening == conn.getState() )
+                            conn.handleConnection(conn.transport, addConnection);
+                        else if ( ! conn.handleIncoming(conn.transport))
+                            unregister(conn.transport);
+                    }
+                    else if (key.isWritable())
+                    {
+                        auto conn = cast(ConnectionHandler) key.attachment;
+                        if ( ! conn.handleOutgoing(conn.transport))
+                            unregister(conn.transport); 
+                    }
+                    else if (key.isHangup())
+                    {
+                        auto conn = cast(ConnectionHandler) key.attachment;
+                        if ( ! conn.handleDisconnect(conn.transport))
+                            unregister(conn.transport);
+                    }
+                    else if (key.isError() || key.isInvalidHandle())
+                    {
+                        // error, close connection
+                        auto conn = cast(ConnectionHandler) key.attachment;
+                        if ( ! conn.handleError(conn.transport, key.isInvalidHandle()))
+                            selector.unregister(conn.transport);
+                    }
+                }
+            }
+            else if (eventCount == 0)
+            {
+                /* can't think of anything useful to do here. */
+            }
+            else
+            {
+                log.error(format("Selector.select returned {}", eventCount));
+            }
+            
+            //add Conduits to listener
+            freshList.processAll( (ConnectionHandler h)
+            {
+                selector.register(conn.transport, conn.events());
+                //connList.append(conn); 
+                return 1; 
+            });
+            //freshList.processAll(reg);
+
+        } while (running)
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/protocol/Http11.d	Sun Jun 08 01:45:38 2008 -0400
@@ -0,0 +1,5 @@
+module dreactor.protocol.Http11.d;
+
+class Http
+{
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/protocol/Raw.d	Sun Jun 08 01:45:38 2008 -0400
@@ -0,0 +1,125 @@
+module dreactor.protocol.Raw;
+
+import tango.io.Conduit;
+import tango.io.selector.model.ISelector;
+import dreactor.core.AsyncConduit;
+import dreactor.core.SelectLoop;
+import dreactor.core.ConnectionHandler;
+import tango.util.collection.CircularSeq;
+import tango.util.log.Log;
+import tango.util.log.Configurator;
+
+Logger log = Log.getLogger("dreactor.core.SelectLoop");
+
+/******************************************************************************
+    
+    Basic TCP server or client routines for sending raw data.
+
+******************************************************************************/
+class RawListener
+{
+public
+ 
+    this(ConnectionHandler mgr, SelectLoop sel)
+    {
+        manager = mgr;
+        mgr.events(Event.Read);
+        sel.addConnection(mgr);
+        select = sel;
+        children = CircularSeq!(ConnectionHandler);
+        Configurator();
+    }
+ 
+    int accept(Conduit cond)
+    {
+        AsyncConduit newcond = new AsyncConduit;
+        cond.socket().accept(newcond.socket);
+        ConnectionHandler h = ConnectionHandler.New(manager);
+        mgr.events(Event.Read);
+        select.addConnection(mgr);
+        children.append(mgr);
+    }
+
+    bool broadcast(char[] outbuf)
+    {
+        foreach(ConnectionHandler h; children)
+        {
+            if (h.appendBuffer(outbuf))
+            {
+                h.addEvent(Event.Write);
+                select.addConnection(h);
+            }
+        }
+    }
+ 
+    void close()
+    {
+        
+    }
+
+    /**************************************************************************
+
+        send
+        OutgoingHandlerD 
+        To be registered as the response to socket writable event. 
+        Sends data, returns amount sent. Unregisters Handler for sending
+        if there is no more data left to send. 
+
+    ***************************************************************************/
+    int send(ConnectionHandler h, RegisterD reg)
+    {
+        char[] outbuf = h.nextBuffer();
+        if (!outbuf is null)
+        {
+            int sent = h.transport.write(outbuf);
+            if (sent > 0)
+            {
+                if (! h.addOffset(sent))
+                {
+                    h.removeEvent(Event.write);
+                    reg(h);
+                }
+            }
+            else if (sent == EOF)
+            {
+                // EAGAIN ? probably shouldn't have happened. 
+            }
+            else
+            {
+               log.error("Socket send return ERR"); 
+            }
+            return sent;
+        }
+        return 0;
+    }
+
+    /**************************************************************************
+    
+        receive
+        IncomingHandlerD
+        Default incoming data handler. Should be replaced with something useful.
+
+    **************************************************************************/ 
+    int receive(ConnectionHandler h, RegisterD reg)
+    {
+    }
+
+private
+    ConnectionHandler manager;
+    SelectLoop select;
+    CircularSeq!(ConnectionHandler) children;
+}
+
+class RawClient
+{
+    ConnectionHandler manager;
+    SelectLoop select;
+ 
+    this(ConnectionHandler mgr, SelectLoop sel)
+    {
+        manager = mgr;
+        mgr.events(Event.Read);
+        sel.addConnection(mgr);
+        select = sel;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/transport/AsyncSocketConduit.d	Sun Jun 08 01:45:38 2008 -0400
@@ -0,0 +1,258 @@
+/*******************************************************************************
+
+        copyright:      Copyright (c) 2004 Kris Bell. All rights reserved
+
+        license:        BSD style: $(LICENSE)
+
+        version:        Mar 2004 : Initial release
+        version:        Jan 2005 : RedShodan patch for timeout query
+        version:        Dec 2006 : Outback release
+        
+        author:         Kris, modified by Rick Richardson (May 2008)
+
+*******************************************************************************/
+
+module dreactor.transport.AsyncSocketConduit;
+
+private import  tango.time.Time;
+
+public  import  tango.io.Conduit;
+
+private import  tango.net.Socket;
+
+/*******************************************************************************
+
+        A wrapper around the bare Socket to implement the IConduit abstraction
+        and add socket-specific functionality specifically for multiplexing via 
+        poll and the ilk.
+
+        AsyncSocketConduit data-transfer is typically performed in conjunction with
+        an IBuffer, but can happily be handled directly using void array where
+        preferred
+        
+*******************************************************************************/
+
+class AsyncSocketConduit : Conduit
+{
+        package Socket                  socket_;
+
+        /***********************************************************************
+        
+                Create a streaming Internet Socket
+
+        ***********************************************************************/
+        /* overriding the enum from the IConduit interface */
+        enum : uint 
+        {
+                Eof = uint.max, /// the End-of-Flow identifer
+                Err = uint.max -1 // some error ocurred, Should disconnect
+        }
+
+        this ()
+        {
+                this (SocketType.STREAM, ProtocolType.TCP);
+        }
+
+        /***********************************************************************
+        
+                Create an Internet Socket. Used by subclasses and by
+                ServerSocket; the latter via method allocate() below
+
+        ***********************************************************************/
+
+        protected this (SocketType type, ProtocolType protocol, bool create=true)
+        {
+                socket_ = new Socket (AddressFamily.INET, type, protocol, create);
+        }
+
+        /***********************************************************************
+
+                Return the name of this device
+
+        ***********************************************************************/
+
+        override char[] toString()
+        {
+                return socket.toString;
+        }
+
+        /***********************************************************************
+
+                Return the socket wrapper
+                
+        ***********************************************************************/
+
+        Socket socket ()
+        {
+                return socket_;
+        }
+
+        /***********************************************************************
+
+                Return a preferred size for buffering conduit I/O
+
+        ***********************************************************************/
+
+        override uint bufferSize ()
+        {
+                return 1024 * 8;
+        }
+
+        /***********************************************************************
+
+                Models a handle-oriented device.
+
+                TODO: figure out how to avoid exposing this in the general
+                case
+
+        ***********************************************************************/
+
+        override Handle fileHandle ()
+        {
+                return cast(Handle) socket_.fileHandle;
+        }
+
+        /***********************************************************************
+
+                Is this socket still alive?
+
+        ***********************************************************************/
+
+        override bool isAlive ()
+        {
+                return socket_.isAlive;
+        }
+
+        /***********************************************************************
+
+                Connect to the provided endpoint
+        
+        ***********************************************************************/
+
+        AsyncSocketConduit connect (Address addr)
+        {
+                socket_.connect (addr);
+                return this;
+        }
+
+        /***********************************************************************
+
+                Bind the socket. This is typically used to configure a
+                listening socket (such as a server or multicast socket).
+                The address given should describe a local adapter, or
+                specify the port alone (ADDR_ANY) to have the OS assign
+                a local adapter address.
+        
+        ***********************************************************************/
+
+        AsyncSocketConduit bind (Address address)
+        {
+                socket_.bind (address);
+                return this;
+        }
+
+        /***********************************************************************
+
+                Inform other end of a connected socket that we're no longer
+                available. In general, this should be invoked before close()
+                is invoked
+        
+                The shutdown function shuts down the connection of the socket: 
+
+                    -   stops receiving data for this socket. If further data 
+                        arrives, it is rejected.
+
+                    -   stops trying to transmit data from this socket. Also
+                        discards any data waiting to be sent. Stop looking for 
+                        acknowledgement of data already sent; don't retransmit 
+                        if any data is lost.
+
+        ***********************************************************************/
+
+        AsyncSocketConduit shutdown ()
+        {
+                socket_.shutdown (SocketShutdown.BOTH);
+                return this;
+        }
+
+        /***********************************************************************
+
+                Release this AsyncSocketConduit
+
+                Note that one should always disconnect a AsyncSocketConduit 
+                under normal conditions, and generally invoke shutdown 
+                on all connected sockets beforehand
+
+        ***********************************************************************/
+
+        override void detach ()
+        {
+                socket_.detach;
+
+                // deallocate if this came from the free-list,
+                // otherwise just wait for the GC to handle it
+                if (fromList)
+                    deallocate (this);
+        }
+
+       /***********************************************************************
+                
+                Read content from the socket. 
+
+                Returns the number of bytes read from the socket, or
+                IConduit.Eof where there's no more content available
+
+                Note that a timeout is equivalent to Eof. Isolating
+                a timeout condition can be achieved via hadTimeout()
+
+                Note also that a zero return value is not legitimate;
+                such a value indicates Eof
+
+        ***********************************************************************/
+
+        override uint read (void[] dst)
+        {
+                return read (dst, (void[] dst){return socket_.receive(dst);});
+        }
+        
+        /***********************************************************************
+
+                Callback routine to write the provided content to the
+                socket. This will stall until the socket responds in
+                some manner. Returns the number of bytes sent to the
+                output, or IConduit.Eof if the socket cannot write.
+
+        ***********************************************************************/
+
+        override uint write (void[] src)
+        {
+                int count = socket_.send (src);
+                if (count == 0)
+                    count = Eof;
+                else if (count < 0)
+                    count = Err;
+                return count;
+        }
+
+        /***********************************************************************
+ 
+                Internal routine to handle socket read under a timeout.
+                Note that this is synchronized, in order to serialize
+                socket access
+
+        ***********************************************************************/
+
+        package final uint read (void[] dst, int delegate(void[]) dg)
+        {
+                // invoke the actual read op
+                int count = dg (dst);
+                if (count == 0)
+                    return Eof;
+                else if (count < 0)
+                    return Err;
+              
+                return count;
+        }
+        
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/util/ThreadSafeQueue.d	Sun Jun 08 01:45:38 2008 -0400
@@ -0,0 +1,84 @@
+module dreactor.util.ThreadSafeQueue;
+
+import tango.util.collection.CircularSeq;
+
+/******************************************************************************
+
+    ThreadSafeQueue
+    Queue that is probably thread safe. It acts as a job queue, in that 
+    you can push or pop off of the queue. Or you can processAll, which will
+    apply a delegate to each item, then clear the list. 
+
+******************************************************************************/
+class ThreadSafeQueue(TYPE)
+{
+public
+    this(int maxsz = 1000)
+    {
+        list = new CircularSeq!(TYPE);
+        maxsize = maxsz;
+        size = 0;
+    }
+
+    synchronized bool pop(ref TYPE t)
+    {
+        if (list.size())
+        {
+            TYPE t = list.head();
+            list.removeHead();
+            --size;
+            return true;
+        }
+        else
+            return false;
+    }
+
+    synchronized bool push(TYPE t)
+    {
+        if (list.size < maxsize)
+        {
+            list.append(t);
+            ++size;
+            return true;
+        }
+        else
+            return false;
+    }
+
+    synchronized int size()
+    {
+        return size();
+    }
+
+    synchronized int processAll(int delegate(ref T value) dg)
+    {
+        int count = 0;
+        foreach(T t; list)
+        {
+            if (dg(t) < 0)
+                break;
+            ++count;
+        }
+        if (count == size)
+            clear_();
+        else
+            list.removeRange(0, count);
+        return count;
+    }
+
+    synchronized void clear()
+    {
+        clear_();
+    }
+
+private 
+
+    void clear_()
+    {
+        list.removeAll();
+    }
+
+    CircularSeq!(TYPE) list;
+    int maxsize;
+    int size;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dsss.conf	Sun Jun 08 01:45:38 2008 -0400
@@ -0,0 +1,1 @@
+[dreactor]