diff dreactor/core/Vat.d @ 11:5836613d16ac

reorg! reorg!
author rick@minifunk
date Tue, 12 Aug 2008 16:59:56 -0400
parents
children d6a3cfe7c3de
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/core/Vat.d	Tue Aug 12 16:59:56 2008 -0400
@@ -0,0 +1,200 @@
+/*******************************************************************************
+
+        copyright:      Copyright (c) 2008 Rick Richardson. All rights reserved
+
+        license:        BSD style: $(LICENSE)
+
+        version:        Initial release v0.1 : May 2008
+        
+        author:         Rick Richardson
+
+*******************************************************************************/
+
+module dreactor.core.Vat;
+
+import tango.io.selector.Selector;
+import tango.io.selector.model.ISelector;
+import tango.core.Exception;
+import tango.core.Thread;
+import tango.core.Atomic;
+import tango.util.collection.CircularSeq;
+import tango.util.log.Log;
+
+import dreactor.transport.AsyncSocketConduit;
+import dreactor.core.Task;
+import dreactor.util.ThreadSafeQueue;
+
+static char[] version_string = "Vat.d 0.1 2008-05-31";
+
+Logger log;
+
+enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
+alias Message delegate (Conduit c) HandlerDG;
+alias Message function (Conduit c) HandlerFN;
+
+class TaskAttachment
+{
+public
+    Task task;
+    HandlerDG dg;
+    HandlerFN fn;
+
+    this(Task ta, HandlerDG d) 
+    { TaskAttachment t; t.task = ta; t.dg = d; return t; }
+    
+    this(Task ta, HandlerFN f) 
+    { TaskAttachment t; t.task = ta; t.fn = f; return t; }
+
+    public Message opCall(Conduit c) { dg is null ? return fn() : return dg(c); }
+}
+
+class Vat
+{
+private
+    Thread thread;
+    bool running;
+ 
+    Task[int] tasks;
+    int taskCount;
+
+public 
+
+    this(Task t)
+    {
+        addTask(t);
+        this();
+    }
+
+    this()
+    {
+        log = Log.lookup("dreactor.core.Vat");
+        
+        running = true;
+        thread = new Thread(&eventLoop);
+        thread.start();
+    }
+    
+    void addTask(Task t)
+    {
+        t.setVat(this);
+        ++taskCount;
+        tasks[taskCount] = t;
+        t.setId(taskCount);
+    }
+ 
+    void exit()
+    {
+        running = false;
+    }
+
+    void wait()
+    {
+        thread.join();
+    }
+
+    bool addConnection()
+    {
+        log.trace("adding handler");
+        return selector.register(h.transport, h.events(), h);
+    }
+     
+    bool remConnection(Dispatcher handler)
+    {
+        return selector.unregister(h.transport);
+    }
+
+private
+    void eventLoop()
+    {
+        auto selector = new Selector();
+        selector.open();
+        do
+        {
+            execTasks();
+            auto eventCount = selector.select(0.01);
+
+            if (eventCount > 0)
+            {
+                // process events
+                foreach (SelectionKey key; selector.selectedSet())
+                {
+                    if (key.isReadable())
+                    {
+                        // incoming data
+                        log.trace("Read event fired");    
+                        auto conn = cast(Dispatcher) key.attachment;
+                        if ( Dispatcher.State.listening == conn.getState() )
+                            conn.handleConnection(conn.transport, &addConnection);
+                        else
+                            processReturn(conn.handleIncoming(), selector, conn);
+                    }
+                    else if (key.isWritable())
+                    {
+                        log.trace("Write event fired");    
+                        auto conn = cast(Dispatcher) key.attachment;
+                        processReturn(conn.handleOutgoing(), selector, conn);
+                    }
+                    else if (key.isHangup())
+                    {
+                        log.trace("Hangup event fired");
+                        auto conn = cast(Dispatcher) key.attachment;
+                        processReturn(conn.handleDisconnect(), selector, conn);
+                    }
+                    else if (key.isError() || key.isInvalidHandle())
+                    {
+                        log.trace("Error event fired");    
+                        // error, close connection
+                        auto conn = cast(Dispatcher) key.attachment;
+                        conn.handleError(&remConnection);
+                    }
+                }
+            }
+            else if (eventCount == 0)
+            {
+                /* can't think of anything useful to do here. */
+            }
+            else
+            {
+                log.error("Selector.select returned {}", eventCount);
+            }
+
+        } while (running)
+
+    }
+
+    void execTasks()
+    {
+        foreach(int k; tasks.keys)
+        {
+            if (tasks[k].state() == Fiber.State.HOLD)
+                tasks[k].call();
+            if (tasks[k].state() == Fiber.State.TERM)
+                tasks.remove(k);
+        }        
+    }
+
+    void processReturn(int result, Selector s, Dispatcher h)
+    {
+        switch(result)
+        {
+            case CLOSE:
+                s.unregister(h.transport);
+                h.transport.detach();
+            break;
+            case UNREGISTER:
+                s.unregister(h.transport);
+            break;
+            case REMAIN:
+                //this space intentially left blank
+            break;
+            case REGISTER:
+                s.register(h.transport, h.events(), h);
+            break;
+            case REREGISTER:
+                s.register(h.transport, h.events(), h);
+            break;
+            default:
+                log.error("processReturn: unknown return value");
+        }
+    }
+}