changeset 3:e3dbc9208822

basic tests working
author rick@minifunk
date Tue, 08 Jul 2008 11:21:09 -0400
parents d3374d553986
children f8b01c9f7114
files dreactor/core/ConnectionHandler.d dreactor/core/SelectLoop.d dreactor/transport/AsyncSocketConduit.d dreactor/util/ThreadSafeQueue.d dsss.conf
diffstat 5 files changed, 183 insertions(+), 165 deletions(-) [+]
line wrap: on
line diff
--- a/dreactor/core/ConnectionHandler.d	Thu Jun 12 23:12:17 2008 -0400
+++ b/dreactor/core/ConnectionHandler.d	Tue Jul 08 11:21:09 2008 -0400
@@ -1,24 +1,27 @@
 module dreactor.core.ConnectionHandler;
 
 import tango.io.selector.model.ISelector;
-import tango.net.Socket.Address;
 import tango.util.collection.CircularSeq;
+import tango.net.Socket;
+public  import  tango.core.Exception;
+import dreactor.transport.AsyncSocketConduit;
 
-import dreactor.transport.AsyncSocketConduit;
+import tango.util.log.Log;
+import tango.util.log.Config;
 
 alias bool delegate(ConnectionHandler) RegisterD;
 
-alias bool delegate(ConnectionHandler, RegisterD)   IncomingHandlerD;
-alias bool delegate(ConnectionHandler, RegisterD)   OutgoingHandlerD;
+alias int delegate(ConnectionHandler)   IncomingHandlerD;
+alias int delegate(ConnectionHandler)   OutgoingHandlerD;
 alias int delegate(ConnectionHandler, RegisterD)    ErrorHandlerD;
-alias bool delegate(ConnectionHandler, RegisterD)   DisconnectHandlerD;
-alias int delegate(Conduit, RegisterD)              ConnectHandlerD;
+alias int delegate(ConnectionHandler)   DisconnectHandlerD;
+alias int delegate(Conduit, RegisterD)  ConnectHandlerD;
 
-alias bool function(ConnectionHandler, RegisterD)   IncomingHandlerF;
-alias bool function(ConnectionHandler, RegisterD)   OutgoingHandlerF;
+alias int function(ConnectionHandler)   IncomingHandlerF;
+alias int function(ConnectionHandler)   OutgoingHandlerF;
 alias int function(ConnectionHandler, RegisterD)    ErrorHandlerF;
-alias bool function(ConnectionHandler, RegisterD)   DisconnectHandlerF;
-alias int function(Conduit, RegisterD)              ConnectHandlerF;
+alias int function(ConnectionHandler)   DisconnectHandlerF;
+alias int function(Conduit, RegisterD)  ConnectHandlerF;
 
 
 /******************************************************************************
@@ -27,23 +30,24 @@
     These can be populated passed to a SelectLoop directly by the end user, 
     or may be managed by a chosen Protocol. 
 ******************************************************************************/
-class ConnectionHandler()
+class ConnectionHandler
 {
  public 
-    enum { init, connected, listening, idle,  closing };
+    enum State { init, connected, listening, idle,  closing };
   
     /**************************************************************************
 
-        Standard Ctor, takes a transport
+        Standard Ctor, takes a transport_
 
     **************************************************************************/ 
     this (Conduit trans, bool listener = false)
     {
-        transport = trans;
+        transport_ = trans;
         ibuf_len = 0;
-        obuf_len = 0;
         i_offset = 0;
         o_offset = 0;
+        out_buffers = new CircularSeq!(char[]);
+        log = Log.lookup("dreactor.core.ConnectionHandler");
     }
       
     /**********************************************************************
@@ -117,47 +121,51 @@
         Handlers to be called by the SelectLoop when events occur
     
     **********************************************************************/
-    void handleIncoming(Conduit cond)
+    int handleIncoming()
     {
-        if (!inD is null)
-            return inD(cond);
-        else if (!inF is null)
-            return inF(cond);
+        if (inD !is null)
+            return inD(this);
+        else if (inF !is null)
+            return inF(this);
+        else 
+            throw new Exception("no Incoming handler set");
     }
 
-    void handleOutgoing(Conduit cond)
+    int handleOutgoing()
     {
-        if (!outD is null)
-            return outD(cond);
-        else if (!outF is null)
-            return outF(cond);
+        if (outD !is null)
+            return outD(this);
+        else if (outF !is null)
+            return outF(this);
+        else 
+            throw new Exception("no Outgoing handler set");
     }
 
-    int handleError(Conduit cond)
+    int handleError(RegisterD reg)
     {
-        if (!errD is null)
-            return errD(cond);
-        else if (!errF is null)
-            return errF(cond);
+        if (errD !is null)
+            return errD(this, reg);
+        else if (errF !is null)
+            return errF(this, reg);
     }
 
-    int handleDisconnect(Conduit cond)
+    int handleDisconnect()
     {
-        if (!disD is null)
-            return disD(addr);
-        else if (!disF is null)
-            return disF(addr);
+        if (disD !is null)
+            return disD(this);
+        else if (disF !is null)
+            return disF(this);
     }
 
-    int handleConnection(Address addr)
+    int handleConnection(Conduit cond, RegisterD reg )
     {
-        if (!conD is null)
+        if (conD !is null)
         {
-            return conD(addr);
+            return conD(cond, reg);
         }
-        else if (!conF is null)
+        else if (conF !is null)
         {
-            return conF(addr);
+            return conF(cond, reg);
         }
     }
 
@@ -176,7 +184,7 @@
         SelectLoop. If it returns false, it was probably already registered.
         
     **************************************************************************/
-    synchronized void appendOutBuffer(char[] outbuf)
+    synchronized bool appendOutBuffer(char[] outbuf)
     {
         out_buffers.append(outbuf);
         out_buffers_len++;
@@ -242,12 +250,15 @@
     **************************************************************************/
     int listen(IPv4Address addr)
     {
-        transport.bind().listen();
-        state = listening;
-        setConnectionHandler()
+        (cast(AsyncSocketConduit)transport_).bind(addr).listen();
+        state_ = State.listening;
+        return 0;
     }
 
-
+    Conduit transport()
+    {
+        return transport_;
+    }
     /**************************************************************************
 
         Configuration functions
@@ -263,14 +274,16 @@
     }
     void addEvent(Event e)
     {
+        log.trace("events_ before: {}", events_);
         events_ |= e;
+        log.trace("events_ after: {}", events_);
     }
     void remEvent(Event e)
     {
-        events_ ^= e;
+        events_ &= ~e;
     }
 
-    State getState() {return state;}
+    State getState() {return state_;}
     
     /*
        connection handlers are left out of this because 
@@ -302,12 +315,12 @@
 	    {   
             hand = freelist;
 	        freelist = hand.next;
-            hand.transport = tran;
+            hand.transport_ = tran;
 	    }
 	    else
 	        hand = new ConnectionHandler(tran);
 
-        if (!other is null)
+        if (!(other is null))
         {
             hand.setHandlers(other);
         }
@@ -317,20 +330,22 @@
     static synchronized void Delete(ConnectionHandler hand)
     {   
         hand.next = freelist;
-        freelist = hand.init();
+        freelist = hand.initialize();
     } 
 
 private
-    
-    char[SZ] in_buffer;
+   
+    char[] in_buffer; 
     CircularSeq!(char[]) out_buffers;
+    int out_buffers_len;
     int ibuf_len;
     int i_offset;
     int o_offset;
+    Logger log; 
 
-    package Conduit transport;
-    State state;
-    Event events; 
+    package Conduit transport_;
+    State state_;
+    Event events_; 
     IncomingHandlerD    inD;
     OutgoingHandlerD    outD;
     ErrorHandlerD       errD;
@@ -350,15 +365,14 @@
         Copy ctor, creates a new ConnectionHandler using the settings
         of an existing handler. 
     **************************************************************************/ 
-    void init()
+    ConnectionHandler initialize()
     {
-        transport = null;
-        state = State.init;
+        transport_ = null;
+        state_ = State.init;
         ibuf_len = 0;
-        obuf_len = 0;
         i_offset = 0;
         o_offset = 0;
-        out_buffer = null;
+        out_buffers.clear();
         inD  = null;
         outD = null;
         errD = null;
@@ -369,6 +383,7 @@
         errF = null;
         disF = null;
         conF = null;
+        return this;
     }
 }
 
--- a/dreactor/core/SelectLoop.d	Thu Jun 12 23:12:17 2008 -0400
+++ b/dreactor/core/SelectLoop.d	Tue Jul 08 11:21:09 2008 -0400
@@ -12,7 +12,6 @@
 
 module dreactor.core.SelectLoop;
 
-import dreactor.transport.AsyncSocketConduit;
 import tango.io.selector.Selector;
 import tango.io.selector.model.ISelector;
 import tango.core.Exception;
@@ -20,11 +19,14 @@
 import tango.core.Atomic;
 import tango.util.collection.LinkSeq;
 import tango.util.log.Log;
-import tango.util.log.Configurator;
 
-import dreactor.core.ConnectionHandler; 
+import dreactor.transport.AsyncSocketConduit;
+import dreactor.core.ConnectionHandler;
+import dreactor.util.ThreadSafeQueue;
 
-Logger log = Log.getLogger("dreactor.core.SelectLoop");
+Logger log;
+
+enum : int {UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
 
 static char[] version_string = "SelectLoop.d 0.1 2008-05-31";
 
@@ -34,16 +36,15 @@
     Thread thread;
     bool running;
     Atomic!(int) pending;
-   
+ 
     ThreadSafeQueue!(ConnectionHandler) freshList; 
-    //LinkSeq!(ConnectionHandler) connList;  probably not necessary
-
-public    
+    ThreadSafeQueue!(ConnectionHandler) remList;
+public 
     this()
     {
         freshList = new ThreadSafeQueue!(ConnectionHandler);
-        //connList = new LinkSeq!(ConnectionHandler);
-        Configurator();
+        remList = new ThreadSafeQueue!(ConnectionHandler);
+        log = Log.lookup("dreactor.core.SelectLoop");
     }
 
     void run()
@@ -60,18 +61,23 @@
 
     bool addConnection(ConnectionHandler handler)
     {
+        log.trace("adding handler");
         return freshList.push(handler);       
     }
+     
+    bool remConnection(ConnectionHandler handler)
+    {
+        return remList.push(handler);
+    }
 
 private
     void eventLoop()
     {
         auto selector = new Selector();
         selector.open();
-        auto format = Log.format;         
         do
         {
-            auto eventCount = selector.select(10);
+            auto eventCount = selector.select(0.01);
 
             if (eventCount > 0)
             {
@@ -81,30 +87,30 @@
                     if (key.isReadable())
                     {
                         // incoming data
+                        log.trace("Read event fired");    
                         auto conn = cast(ConnectionHandler) key.attachment;
-                        if ( ConnectionHandler.listening == conn.getState() )
-                            conn.handleConnection(conn.transport, addConnection);
-                        else if ( ! conn.handleIncoming(conn.transport))
-                            unregister(conn.transport);
+                        if ( ConnectionHandler.State.listening == conn.getState() )
+                            conn.handleConnection(conn.transport, &addConnection);
+                        else
+                            processReturn(conn.handleIncoming(), selector, conn);
                     }
                     else if (key.isWritable())
                     {
+                        log.trace("Write event fired");    
                         auto conn = cast(ConnectionHandler) key.attachment;
-                        if ( ! conn.handleOutgoing(conn.transport))
-                            unregister(conn.transport); 
+                        processReturn(conn.handleOutgoing(), selector, conn);
                     }
                     else if (key.isHangup())
                     {
                         auto conn = cast(ConnectionHandler) key.attachment;
-                        if ( ! conn.handleDisconnect(conn.transport))
-                            unregister(conn.transport);
+                        processReturn(conn.handleDisconnect(), selector, conn);
                     }
                     else if (key.isError() || key.isInvalidHandle())
                     {
+                        log.trace("Error event fired");    
                         // error, close connection
                         auto conn = cast(ConnectionHandler) key.attachment;
-                        if ( ! conn.handleError(conn.transport, key.isInvalidHandle()))
-                            selector.unregister(conn.transport);
+                        conn.handleError(&remConnection);
                     }
                 }
             }
@@ -114,18 +120,44 @@
             }
             else
             {
-                log.error(format("Selector.select returned {}", eventCount));
+                log.error("Selector.select returned {}", eventCount);
             }
-            
             //add Conduits to listener
-            freshList.processAll( (ConnectionHandler h)
+            freshList.processAll( (ref ConnectionHandler h)
             {
-                selector.register(conn.transport, conn.events());
-                //connList.append(conn); 
+                log.trace("reregistering transport for event {}", h.events());
+                selector.reregister(h.transport, h.events(), h);
                 return 1; 
             });
-            //freshList.processAll(reg);
+            remList.processAll( (ref ConnectionHandler h)
+            {
+                selector.unregister(h.transport);
+                return 1;
+            });
 
         } while (running)
+
+        log.trace("done with while loop");
+    }
+
+    void processReturn(int result, Selector s, ConnectionHandler h)
+    {
+        switch(result)
+        {
+            case UNREGISTER:
+                s.unregister(h.transport);
+            break;
+            case REMAIN:
+                //this space intentially left blank
+            break;
+            case REGISTER:
+                s.register(h.transport, h.events(), h);
+            break;
+            case REREGISTER:
+                s.reregister(h.transport, h.events(), h);
+            break;
+            default:
+                log.error("unknown return value");
+        }
     }
 }
--- a/dreactor/transport/AsyncSocketConduit.d	Thu Jun 12 23:12:17 2008 -0400
+++ b/dreactor/transport/AsyncSocketConduit.d	Tue Jul 08 11:21:09 2008 -0400
@@ -20,8 +20,6 @@
 
 private import  tango.net.Socket;
 
-import tango.net.Socket.Address;
- 
 /*******************************************************************************
 
   A wrapper around the bare Socket to implement the IConduit abstraction
@@ -33,23 +31,16 @@
   preferred
 
  *******************************************************************************/
-alias IPv4Address Address;
 
 class AsyncSocketConduit : Conduit
 {
-    package Socket                  socket_;
+    package Socket socket_;
 
     /***********************************************************************
 
       Create a streaming Internet Socket
 
      ***********************************************************************/
-    /* overriding the enum from the IConduit interface */
-    enum : uint 
-    {
-        Eof = uint.max, /// the End-of-Flow identifer
-        Err = uint.max -1 // some error ocurred, Should disconnect
-    }
 
     this ()
     {
@@ -65,7 +56,8 @@
 
     protected this (SocketType type, ProtocolType protocol, bool create=true)
     {
-       socket_ = new Socket (AddressFamily.INET, type, protocol, create);
+        socket_ = new Socket (AddressFamily.INET, type, protocol, create);
+        socket_.blocking(false);
     }
 
     /***********************************************************************
@@ -159,9 +151,9 @@
         Enable the socket for listening
 
     **************************************************************************/
-    AsyncSocketConduit listen()
+    AsyncSocketConduit listen(int backlog = 255)
     {
-        socket_.listen();
+        socket_.listen(backlog);
         return this;
     }
 
@@ -202,11 +194,6 @@
     override void detach ()
     {
        socket_.detach;
-
-       // deallocate if this came from the free-list,
-       // otherwise just wait for the GC to handle it
-       if (fromList)
-           deallocate (this);
     }
 
     /***********************************************************************
@@ -215,58 +202,26 @@
 
      Returns the number of bytes read from the socket, or
      IConduit.Eof where there's no more content available
-
-     Note that a timeout is equivalent to Eof. Isolating
-     a timeout condition can be achieved via hadTimeout()
-
-     Note also that a zero return value is not legitimate;
-     such a value indicates Eof
+     
+     Return IConduit.Eof if there is an error with the socket.
 
     ***********************************************************************/
-
     override uint read (void[] dst)
     {
-       return read (dst, (void[] dst){return socket_.receive(dst);});
+       // invoke the actual read op
+       return socket_.receive(dst);
     }
 
+
     /***********************************************************************
 
      Callback routine to write the provided content to the
-     socket. This will stall until the socket responds in
-     some manner. Returns the number of bytes sent to the
-     output, or IConduit.Eof if the socket cannot write.
-
+     socket. 
     ***********************************************************************/
 
     override uint write (void[] src)
     {
-       int count = socket_.send (src);
-       if (count == 0)
-           count = Eof;
-       else if (count < 0)
-           count = Err;
-       return count;
+       return socket_.send (src);
     }
-
-    /***********************************************************************
-
-     Internal routine to handle socket read under a timeout.
-     Note that this is synchronized, in order to serialize
-     socket access
+}
 
-    ***********************************************************************/
-
-    package final uint read (void[] dst, int delegate(void[]) dg)
-    {
-       // invoke the actual read op
-       int count = dg (dst);
-       if (count == 0)
-           return Eof;
-       else if (count < 0)
-           return Err;
-
-       return count;
-    }
-
-    }
-
--- a/dreactor/util/ThreadSafeQueue.d	Thu Jun 12 23:12:17 2008 -0400
+++ b/dreactor/util/ThreadSafeQueue.d	Tue Jul 08 11:21:09 2008 -0400
@@ -1,7 +1,10 @@
 module dreactor.util.ThreadSafeQueue;
 
 import tango.util.collection.CircularSeq;
+import tango.core.Atomic;
 
+import tango.util.log.Log;
+import tango.util.log.Config;
 /******************************************************************************
 
     ThreadSafeQueue
@@ -15,18 +18,19 @@
 public
     this(int maxsz = 1000)
     {
-        list = new CircularSeq!(TYPE);
-        maxsize = maxsz;
-        size = 0;
+        list_ = new CircularSeq!(TYPE);
+        maxsize_ = maxsz;
+        size_ = 0;
+        log = Log.lookup("dreactor.util.ThreadSafeQueue");
     }
 
     synchronized bool pop(ref TYPE t)
     {
-        if (list.size())
+        if (size_ > 0)
         {
-            TYPE t = list.head();
-            list.removeHead();
-            --size;
+            t = list_.head();
+            list_.removeHead();
+            size_--;
             return true;
         }
         else
@@ -35,10 +39,10 @@
 
     synchronized bool push(TYPE t)
     {
-        if (list.size < maxsize)
+        if (size_ < maxsize_)
         {
-            list.append(t);
-            ++size;
+            list_.append(t);
+            size_++;
             return true;
         }
         else
@@ -47,22 +51,31 @@
 
     synchronized int size()
     {
-        return size();
+        return size_;
     }
 
-    synchronized int processAll(int delegate(ref T value) dg)
+    synchronized int processAll(int delegate(ref TYPE value) dg)
     {
+        if (0 >= size_)
+            return 0;
+
         int count = 0;
-        foreach(T t; list)
+        foreach(TYPE t; list_)
         {
             if (dg(t) < 0)
                 break;
             ++count;
         }
-        if (count == size)
+        if (count == size_)
+        {
             clear_();
+            size_ = 0;
+        }
         else
-            list.removeRange(0, count);
+        {
+            list_.removeRange(0, count);
+            size_ -= count;
+        }
         return count;
     }
 
@@ -75,10 +88,12 @@
 
     void clear_()
     {
-        list.removeAll();
+        list_.clear();
+        size_ = 0 ;
     }
 
-    CircularSeq!(TYPE) list;
-    int maxsize;
-    int size;
+    int maxsize_;
+    int size_;
+    Logger log;
+    CircularSeq!(TYPE) list_;
 }
--- a/dsss.conf	Thu Jun 12 23:12:17 2008 -0400
+++ b/dsss.conf	Tue Jul 08 11:21:09 2008 -0400
@@ -1,2 +1,3 @@
 [test/test.d]
+buildFlags=-debug -gc