changeset 10:e75a2e506b1d

housekeeping
author rick@minifunk
date Fri, 01 Aug 2008 16:30:45 -0400
parents 5412a1ff2e49
children 5836613d16ac
files dreactor/core/AsyncVat.d dreactor/core/Vat.d dreactor/protocol/Http11.d dreactor/protocol/RawTcp.d test/chatserver.d test/test
diffstat 6 files changed, 324 insertions(+), 268 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/core/AsyncVat.d	Fri Aug 01 16:30:45 2008 -0400
@@ -0,0 +1,171 @@
+/*******************************************************************************
+
+        copyright:      Copyright (c) 2008 Rick Richardson. All rights reserved
+
+        license:        BSD style: $(LICENSE)
+
+        version:        Initial release v0.1 : May 2008
+        
+        author:         Rick Richardson
+
+*******************************************************************************/
+
+module dreactor.core.Vat;
+
+import tango.io.selector.Selector;
+import tango.io.selector.model.ISelector;
+import tango.core.Exception;
+import tango.core.Thread;
+import tango.core.Atomic;
+import tango.util.collection.LinkSeq;
+import tango.util.log.Log;
+
+import dreactor.transport.AsyncSocketConduit;
+import dreactor.core.Dispatcher;
+import dreactor.util.ThreadSafeQueue;
+
+Logger log;
+
+enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
+
+static char[] version_string = "Vat.d 0.1 2008-05-31";
+
+class Vat
+{
+private
+    Thread thread;
+    bool running;
+    Atomic!(int) pending;
+ 
+    ThreadSafeQueue!(Dispatcher) freshList; 
+    ThreadSafeQueue!(Dispatcher) remList;
+public 
+    this()
+    {
+        freshList = new ThreadSafeQueue!(Dispatcher);
+        remList = new ThreadSafeQueue!(Dispatcher);
+        log = Log.lookup("dreactor.core.Vat");
+    }
+
+    void run()
+    {
+        running = true;
+        thread = new Thread(&eventLoop);
+        thread.start();
+    }
+
+    void exit()
+    {
+        running = false;
+    }
+
+    void wait()
+    {
+        thread.join();
+    }
+
+    bool addConnection(Dispatcher handler)
+    {
+        log.trace("adding handler");
+        return freshList.push(handler);       
+    }
+     
+    bool remConnection(Dispatcher handler)
+    {
+        return remList.push(handler);
+    }
+
+private
+    void eventLoop()
+    {
+        auto selector = new Selector();
+        selector.open();
+        do
+        {
+            auto eventCount = selector.select(0.01);
+
+            if (eventCount > 0)
+            {
+                // process events
+                foreach (SelectionKey key; selector.selectedSet())
+                {
+                    if (key.isReadable())
+                    {
+                        // incoming data
+                        log.trace("Read event fired");    
+                        auto conn = cast(Dispatcher) key.attachment;
+                        if ( Dispatcher.State.listening == conn.getState() )
+                            conn.handleConnection(conn.transport, &addConnection);
+                        else
+                            processReturn(conn.handleIncoming(), selector, conn);
+                    }
+                    else if (key.isWritable())
+                    {
+                        log.trace("Write event fired");    
+                        auto conn = cast(Dispatcher) key.attachment;
+                        processReturn(conn.handleOutgoing(), selector, conn);
+                    }
+                    else if (key.isHangup())
+                    {
+                        log.trace("Hangup event fired");
+                        auto conn = cast(Dispatcher) key.attachment;
+                        processReturn(conn.handleDisconnect(), selector, conn);
+                    }
+                    else if (key.isError() || key.isInvalidHandle())
+                    {
+                        log.trace("Error event fired");    
+                        // error, close connection
+                        auto conn = cast(Dispatcher) key.attachment;
+                        conn.handleError(&remConnection);
+                    }
+                }
+            }
+            else if (eventCount == 0)
+            {
+                /* can't think of anything useful to do here. */
+            }
+            else
+            {
+                log.error("Selector.select returned {}", eventCount);
+            }
+            //add Conduits to listener
+            freshList.processAll( (ref Dispatcher h)
+            {
+                selector.register(h.transport, h.events(), h);
+                return 1; 
+            });
+            remList.processAll( (ref Dispatcher h)
+            {
+                selector.unregister(h.transport);
+                return 1;
+            });
+
+        } while (running)
+
+    }
+
+    void processReturn(int result, Selector s, Dispatcher h)
+    {
+        switch(result)
+        {
+            case CLOSE:
+                s.unregister(h.transport);
+                h.transport.detach();
+            break;
+            case UNREGISTER:
+                s.unregister(h.transport);
+            break;
+            case REMAIN:
+                //this space intentially left blank
+            break;
+            case REGISTER:
+                s.register(h.transport, h.events(), h);
+            break;
+            case REREGISTER:
+                s.register(h.transport, h.events(), h);
+            break;
+            default:
+                log.error("processReturn: unknown return value");
+        }
+    }
+}
--- a/dreactor/core/Vat.d	Sat Jul 12 10:42:41 2008 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,171 +0,0 @@
-/*******************************************************************************
-
-        copyright:      Copyright (c) 2008 Rick Richardson. All rights reserved
-
-        license:        BSD style: $(LICENSE)
-
-        version:        Initial release v0.1 : May 2008
-        
-        author:         Rick Richardson
-
-*******************************************************************************/
-
-module dreactor.core.Vat;
-
-import tango.io.selector.Selector;
-import tango.io.selector.model.ISelector;
-import tango.core.Exception;
-import tango.core.Thread;
-import tango.core.Atomic;
-import tango.util.collection.LinkSeq;
-import tango.util.log.Log;
-
-import dreactor.transport.AsyncSocketConduit;
-import dreactor.core.Dispatcher;
-import dreactor.util.ThreadSafeQueue;
-
-Logger log;
-
-enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
-
-static char[] version_string = "Vat.d 0.1 2008-05-31";
-
-class Vat
-{
-private
-    Thread thread;
-    bool running;
-    Atomic!(int) pending;
- 
-    ThreadSafeQueue!(Dispatcher) freshList; 
-    ThreadSafeQueue!(Dispatcher) remList;
-public 
-    this()
-    {
-        freshList = new ThreadSafeQueue!(Dispatcher);
-        remList = new ThreadSafeQueue!(Dispatcher);
-        log = Log.lookup("dreactor.core.Vat");
-    }
-
-    void run()
-    {
-        running = true;
-        thread = new Thread(&eventLoop);
-        thread.start();
-    }
-
-    void exit()
-    {
-        running = false;
-    }
-
-    void wait()
-    {
-        thread.join();
-    }
-
-    bool addConnection(Dispatcher handler)
-    {
-        log.trace("adding handler");
-        return freshList.push(handler);       
-    }
-     
-    bool remConnection(Dispatcher handler)
-    {
-        return remList.push(handler);
-    }
-
-private
-    void eventLoop()
-    {
-        auto selector = new Selector();
-        selector.open();
-        do
-        {
-            auto eventCount = selector.select(0.01);
-
-            if (eventCount > 0)
-            {
-                // process events
-                foreach (SelectionKey key; selector.selectedSet())
-                {
-                    if (key.isReadable())
-                    {
-                        // incoming data
-                        log.trace("Read event fired");    
-                        auto conn = cast(Dispatcher) key.attachment;
-                        if ( Dispatcher.State.listening == conn.getState() )
-                            conn.handleConnection(conn.transport, &addConnection);
-                        else
-                            processReturn(conn.handleIncoming(), selector, conn);
-                    }
-                    else if (key.isWritable())
-                    {
-                        log.trace("Write event fired");    
-                        auto conn = cast(Dispatcher) key.attachment;
-                        processReturn(conn.handleOutgoing(), selector, conn);
-                    }
-                    else if (key.isHangup())
-                    {
-                        log.trace("Hangup event fired");
-                        auto conn = cast(Dispatcher) key.attachment;
-                        processReturn(conn.handleDisconnect(), selector, conn);
-                    }
-                    else if (key.isError() || key.isInvalidHandle())
-                    {
-                        log.trace("Error event fired");    
-                        // error, close connection
-                        auto conn = cast(Dispatcher) key.attachment;
-                        conn.handleError(&remConnection);
-                    }
-                }
-            }
-            else if (eventCount == 0)
-            {
-                /* can't think of anything useful to do here. */
-            }
-            else
-            {
-                log.error("Selector.select returned {}", eventCount);
-            }
-            //add Conduits to listener
-            freshList.processAll( (ref Dispatcher h)
-            {
-                selector.register(h.transport, h.events(), h);
-                return 1; 
-            });
-            remList.processAll( (ref Dispatcher h)
-            {
-                selector.unregister(h.transport);
-                return 1;
-            });
-
-        } while (running)
-
-    }
-
-    void processReturn(int result, Selector s, Dispatcher h)
-    {
-        switch(result)
-        {
-            case CLOSE:
-                s.unregister(h.transport);
-                h.transport.detach();
-            break;
-            case UNREGISTER:
-                s.unregister(h.transport);
-            break;
-            case REMAIN:
-                //this space intentially left blank
-            break;
-            case REGISTER:
-                s.register(h.transport, h.events(), h);
-            break;
-            case REREGISTER:
-                s.register(h.transport, h.events(), h);
-            break;
-            default:
-                log.error("processReturn: unknown return value");
-        }
-    }
-}
--- a/dreactor/protocol/Http11.d	Sat Jul 12 10:42:41 2008 -0400
+++ b/dreactor/protocol/Http11.d	Fri Aug 01 16:30:45 2008 -0400
@@ -1,5 +1,43 @@
-module dreactor.protocol.Http11.d;
+module dreactor.protocol.Http11;
+
+import dreactor.protocol.RawTcp;
+import dreactor.protocol.http11_parser;
+
+class HttpListener
+{
+public
+    this(Vat sel, IPv4Address addr)
+    {
+        listener = new RawTCPListener(sel, IPv4Address addr);
+        parser = new Http11Parser(); 
+        listener.setDataHandler(&onData);
+    }
+
+    private int onData(char[] buffer)
+    {
+        parser.execute(buffer); 
+    }
 
-class Http
+    
+private
+    RawTCPListener listener;
+    Http11Parser parser;
+}
+
+class HttpClient
 {
+public
+    this(Vat sel)
+    {
+        client = new RawTCPClient(sel);
+        client.setDataHandler(&onData);
+    }
+
+    private int onData()
+    {
+        
+    }
+private
+    RawTCPClient client;
 }
+
--- a/dreactor/protocol/RawTcp.d	Sat Jul 12 10:42:41 2008 -0400
+++ b/dreactor/protocol/RawTcp.d	Fri Aug 01 16:30:45 2008 -0400
@@ -16,19 +16,18 @@
     Basic TCP server or client routines for sending raw data.
 
 ******************************************************************************/
-class RawTCPListener : RawTCPHandler
+class RawTCPListener
 {
 public
-    Logger log; 
     this(Dispatcher mgr, Vat sel, IPv4Address addr)
     {
         manager = mgr;
         mgr.events(Event.Read);
-        mgr.setOutgoingHandler(&onSend);
+        mgr.setOutgoingHandler(&RawTCPHandler.onSend);
         mgr.setIncomingHandler(&onReceive);
         mgr.setConnectHandler(&accept);
-        mgr.setErrorHandler(&onError);
-        mgr.setDisconnectHandler(&onHangup);
+        mgr.setErrorHandler(&RawTCPHandler.onError);
+        mgr.setDisconnectHandler(&RawTCPHandler.onHangup);
         mgr.listen(addr);
          
         sel.addConnection(mgr);
@@ -47,13 +46,7 @@
 
     ~this()
     {
-        foreach(Dispatcher d; children)
-        {
-            (cast(AsyncSocketConduit)d.transport).shutdown();
-            (cast(AsyncSocketConduit)d.transport).detach();
-        }
-        (cast(AsyncSocketConduit)manager.transport).shutdown();
-        (cast(AsyncSocketConduit)manager.transport).detach(); 
+        close();
     } 
 
     int accept(Conduit cond, RegisterD reg)
@@ -96,7 +89,6 @@
         if (d.appendOutBuffer(outbuf))
         {
             d.addEvent(Event.Write);
-            d.setOutgoingHandler(&onSend);
             if (!vat.addConnection(d))
             {
                 log.error("unable to register mgr");
@@ -105,7 +97,14 @@
         return 0;
     }
 
-    public int onReceive(Dispatcher h)
+    /**************************************************************************
+
+        receive
+        IncomingHandlerD
+        Default incoming data handler. Should be replaced with something useful.
+
+    **************************************************************************/ 
+    int onReceive(Dispatcher h)
     {
         Logger log = Log.lookup("Handlers.onReceive");
 
@@ -133,28 +132,43 @@
  
     void close()
     {
+        foreach(Dispatcher d; children)
+        {
+            (cast(AsyncSocketConduit)d.transport).shutdown();
+            (cast(AsyncSocketConduit)d.transport).detach();
+        }
+        (cast(AsyncSocketConduit)manager.transport).shutdown();
+        (cast(AsyncSocketConduit)manager.transport).detach(); 
         
     }
+    
+    void setDataHandler(void delegate(char[], Dispatcher) h)
+    {
+        dataHandler = h;
+    }
 
 private
-    Dispatcher manager;
     Vat vat;
     CircularSeq!(Dispatcher) children;
+    Dispatcher manager;
+    Logger log;
+    RawTCPHandler h;
+    void delegate(char[], Dispatcher) dataHandler;
 }
 
-class RawTCPClient : RawTCPHandler
+class RawTCPClient
 {
+
 public
-    Logger log;
     this(Dispatcher mgr, Vat sel, Event evts = Event.Read)
     {
         manager = mgr;
         manager.events(evts);
         connected = false;
-        mgr.setOutgoingHandler(&onSend);
+        mgr.setOutgoingHandler(&RawTCPHandler.onSend);
         mgr.setIncomingHandler(&onReceive);
-        mgr.setErrorHandler(&onError);
-        mgr.setDisconnectHandler(&onHangup);
+        mgr.setErrorHandler(&RawTCPHandler.onError);
+        mgr.setDisconnectHandler(&RawTCPHandler.onHangup);
         vat = sel;
         log = Log.lookup("dreactor.protocol.RawTcpClient");
     }
@@ -214,10 +228,48 @@
         return 0;
     }
     
+    /**************************************************************************
+
+        receive
+        IncomingHandlerD
+        Default incoming data handler. Should be replaced with something useful.
+
+    **************************************************************************/ 
+    public int onReceive(Dispatcher h)
+    {
+        Logger log = Log.lookup("Handlers.onReceive");
+
+        char inbuf[8192];
+        int amt;
+        if((amt = h.transport.read(inbuf)) > 0)
+        {
+            if (dataHandler)
+                dataHandler(inbuf[0 .. amt], h);
+            else
+                log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
+        }
+        else
+        {
+            if (amt == 0)
+            {
+                return CLOSE;
+            }
+            log.error("Received no data, err = {}", amt);
+        }
+        return REMAIN;
+    }
+    
+    void setDataHandler(void delegate(char[], Dispatcher) h)
+    {
+        dataHandler = h;
+    }
 private
+    void delegate(char[], Dispatcher) dataHandler;
     Dispatcher manager;
     Vat vat;
     bool connected;
+    Logger log;
+    RawTCPHandler h;
 }
 
 
@@ -226,94 +278,60 @@
     Default Event handlers common to both listener/clients
 
 ******************************************************************************/
-class RawTCPHandler
-{
-/**************************************************************************
-
-    onSend
-    OutgoingHandlerD 
-    To be registered as the response to socket writable event. 
-    Sends data, returns amount sent. Unregisters Handler for sending
-    if there is no more data left to send. 
-
-***************************************************************************/
-public int onSend(Dispatcher h)
+struct RawTCPHandler
 {
-    Logger log = Log.lookup("Handlers.onSend");
- 
-    char[] outbuf = h.nextBuffer();
-    if (outbuf !is null)
+    /**************************************************************************
+
+        onSend
+        OutgoingHandlerD 
+        To be registered as the response to socket writable event. 
+        Sends data, returns amount sent. Unregisters Handler for sending
+        if there is no more data left to send. 
+
+    ***************************************************************************/
+    public static int onSend(Dispatcher h)
     {
-        int sent = h.transport.write(outbuf);
-        if (sent > 0)
+        Logger log = Log.lookup("Handlers.onSend");
+     
+        char[] outbuf = h.nextBuffer();
+        if (outbuf !is null)
         {
-            if (! h.addOffset(sent))
+            int sent = h.transport.write(outbuf);
+            if (sent > 0)
             {
-                h.remEvent(Event.Write);
-                return REREGISTER;
+                if (! h.addOffset(sent))
+                {
+                    h.remEvent(Event.Write);
+                    return REREGISTER;
+                }
             }
-        }
-        else if (sent == AsyncSocketConduit.Eof)
-        {
-            log.error("Select said socket was writable, but sent 0 bytes");
+            else if (sent == AsyncSocketConduit.Eof)
+            {
+                log.error("Select said socket was writable, but sent 0 bytes");
+            }
+            else
+            {
+               log.error("Socket send return ERR");
+            }
+            return REMAIN;
         }
         else
         {
-           log.error("Socket send return ERR");
+            h.remEvent(Event.Write);
+            return REREGISTER;
         }
-        return REMAIN;
     }
-    else
-    {
-        h.remEvent(Event.Write);
-        return REREGISTER;
-    }
-}
-/**************************************************************************
 
-    receive
-    IncomingHandlerD
-    Default incoming data handler. Should be replaced with something useful.
-
-**************************************************************************/ 
-public int onReceive(Dispatcher h)
-{
-    Logger log = Log.lookup("Handlers.onReceive");
-
-    char inbuf[8192];
-    int amt;
-    if((amt = h.transport.read(inbuf)) > 0)
+    static int onHangup(Dispatcher d)
     {
-        if (dataHandler)
-            dataHandler(inbuf[0 .. amt], h);
-        else
-            log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
-    }
-    else
-    {
-        if (amt == 0)
-        {
-            return CLOSE;
-        }
-        log.error("Received no data, err = {}", amt);
+        return UNREGISTER;
     }
-    return REMAIN;
-}
-int onHangup(Dispatcher d)
-{
-    return UNREGISTER;
-}
-int onError(Dispatcher d, RegisterD unreg)
-{
-    return CLOSE;
-} 
 
-void setDataHandler(void delegate(char[],Dispatcher) del)
-{
-    dataHandler = del;
-}
-protected
-    void delegate(char[], Dispatcher) dataHandler;
+    static int onError(Dispatcher d, RegisterD unreg)
+    {
+        return CLOSE;
+    } 
+
 }
 
 bool includes(Dispatcher[] haystack, Dispatcher needle)
--- a/test/chatserver.d	Sat Jul 12 10:42:41 2008 -0400
+++ b/test/chatserver.d	Fri Aug 01 16:30:45 2008 -0400
@@ -10,13 +10,13 @@
 import dreactor.protocol.RawTcp;
 import dreactor.transport.AsyncSocketConduit;
 
-int count;
 int main()
 { 
     Vat l_vat = new Vat();
     Logger log = Log.lookup("dreactor.chatserver");
     Log.root.level(log.Level.Info, true); 
     RawTCPListener listener = new RawTCPListener(l_vat, new IPv4Address(5555));
+
     listener.setDataHandler( (char[] inbuf, Dispatcher d) {
     
         listener.broadcast(inbuf, [d]);
Binary file test/test has changed