diff asyncdreactor/core/AsyncVat.d @ 11:5836613d16ac

reorg! reorg!
author rick@minifunk
date Tue, 12 Aug 2008 16:59:56 -0400
parents dreactor/core/AsyncVat.d@e75a2e506b1d
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/asyncdreactor/core/AsyncVat.d	Tue Aug 12 16:59:56 2008 -0400
@@ -0,0 +1,171 @@
+/*******************************************************************************
+
+        copyright:      Copyright (c) 2008 Rick Richardson. All rights reserved
+
+        license:        BSD style: $(LICENSE)
+
+        version:        Initial release v0.1 : May 2008
+        
+        author:         Rick Richardson
+
+*******************************************************************************/
+
+module dreactor.core.Vat;
+
+import tango.io.selector.Selector;
+import tango.io.selector.model.ISelector;
+import tango.core.Exception;
+import tango.core.Thread;
+import tango.core.Atomic;
+import tango.util.collection.LinkSeq;
+import tango.util.log.Log;
+
+import dreactor.transport.AsyncSocketConduit;
+import dreactor.core.Dispatcher;
+import dreactor.util.ThreadSafeQueue;
+
+Logger log;
+
+enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
+
+static char[] version_string = "Vat.d 0.1 2008-05-31";
+
+class Vat
+{
+private
+    Thread thread;
+    bool running;
+    Atomic!(int) pending;
+ 
+    ThreadSafeQueue!(Dispatcher) freshList; 
+    ThreadSafeQueue!(Dispatcher) remList;
+public 
+    this()
+    {
+        freshList = new ThreadSafeQueue!(Dispatcher);
+        remList = new ThreadSafeQueue!(Dispatcher);
+        log = Log.lookup("dreactor.core.Vat");
+    }
+
+    void run()
+    {
+        running = true;
+        thread = new Thread(&eventLoop);
+        thread.start();
+    }
+
+    void exit()
+    {
+        running = false;
+    }
+
+    void wait()
+    {
+        thread.join();
+    }
+
+    bool addConnection(Dispatcher handler)
+    {
+        log.trace("adding handler");
+        return freshList.push(handler);       
+    }
+     
+    bool remConnection(Dispatcher handler)
+    {
+        return remList.push(handler);
+    }
+
+private
+    void eventLoop()
+    {
+        auto selector = new Selector();
+        selector.open();
+        do
+        {
+            auto eventCount = selector.select(0.01);
+
+            if (eventCount > 0)
+            {
+                // process events
+                foreach (SelectionKey key; selector.selectedSet())
+                {
+                    if (key.isReadable())
+                    {
+                        // incoming data
+                        log.trace("Read event fired");    
+                        auto conn = cast(Dispatcher) key.attachment;
+                        if ( Dispatcher.State.listening == conn.getState() )
+                            conn.handleConnection(conn.transport, &addConnection);
+                        else
+                            processReturn(conn.handleIncoming(), selector, conn);
+                    }
+                    else if (key.isWritable())
+                    {
+                        log.trace("Write event fired");    
+                        auto conn = cast(Dispatcher) key.attachment;
+                        processReturn(conn.handleOutgoing(), selector, conn);
+                    }
+                    else if (key.isHangup())
+                    {
+                        log.trace("Hangup event fired");
+                        auto conn = cast(Dispatcher) key.attachment;
+                        processReturn(conn.handleDisconnect(), selector, conn);
+                    }
+                    else if (key.isError() || key.isInvalidHandle())
+                    {
+                        log.trace("Error event fired");    
+                        // error, close connection
+                        auto conn = cast(Dispatcher) key.attachment;
+                        conn.handleError(&remConnection);
+                    }
+                }
+            }
+            else if (eventCount == 0)
+            {
+                /* can't think of anything useful to do here. */
+            }
+            else
+            {
+                log.error("Selector.select returned {}", eventCount);
+            }
+            //add Conduits to listener
+            freshList.processAll( (ref Dispatcher h)
+            {
+                selector.register(h.transport, h.events(), h);
+                return 1; 
+            });
+            remList.processAll( (ref Dispatcher h)
+            {
+                selector.unregister(h.transport);
+                return 1;
+            });
+
+        } while (running)
+
+    }
+
+    void processReturn(int result, Selector s, Dispatcher h)
+    {
+        switch(result)
+        {
+            case CLOSE:
+                s.unregister(h.transport);
+                h.transport.detach();
+            break;
+            case UNREGISTER:
+                s.unregister(h.transport);
+            break;
+            case REMAIN:
+                //this space intentially left blank
+            break;
+            case REGISTER:
+                s.register(h.transport, h.events(), h);
+            break;
+            case REREGISTER:
+                s.register(h.transport, h.events(), h);
+            break;
+            default:
+                log.error("processReturn: unknown return value");
+        }
+    }
+}