changeset 12:d6a3cfe7c3de

more stuff
author rick@Macintosh.local
date Wed, 27 Aug 2008 00:47:33 -0400
parents 5836613d16ac
children 8c9b1276f623
files dreactor/core/Task.d dreactor/core/Vat.d dreactor/protocol/DefaultProvider.d dreactor/protocol/Emitter.d dreactor/protocol/IProvider.d dreactor/protocol/Protocol.d dreactor/protocol/RawTcp.d dreactor/protocol/TcpProvider.d dreactor/protocol/UdpProvider.d dreactor/transport/AsyncSocketConduit.d dsss.conf dsss.last test/async/chatclient test/async/chatclient.d test/async/chatserver test/async/chatserver.d test/async/dummy.txt test/async/longtest test/async/longtest.d test/async/test test/async/test.d test/chatclient.d test/chatserver.d
diffstat 22 files changed, 1252 insertions(+), 501 deletions(-) [+]
line wrap: on
line diff
--- a/dreactor/core/Task.d	Tue Aug 12 16:59:56 2008 -0400
+++ b/dreactor/core/Task.d	Wed Aug 27 00:47:33 2008 -0400
@@ -1,26 +1,124 @@
 module dreactor.core.Task;
 
 import tango.core.Thread;
+import tango.util.container.HashMap;
+import tango.util.container.CircularList;
 import dreactor.core.Vat;
-import dreactor.protocol.Protocol;
-import dreactor.protocol.Dispatcher;
+import dreactor.protocol.IProvider;
+
+alias CircularList!(Message) Messages;
+
+class Mailbox
+{
+public
+
+    this () { box = new HashMap!(int, Messages); }
+
+    Message popMessageOfType(int type)
+    {
+        Messages m;
+        if (box.get(type, m))
+        {
+            Message msg = m.removeHead();
+            if (msg)
+                msg_count.store(msg_count.load()-1);
+
+            if (m.isEmpty())
+                box.removeKey(type);
+
+            return msg;
+        }
+        else
+            return null;
+    }
+
+    //TODO this could be optimized to use set intersection logic instead of checking for 
+    //multiple keys one at a time. 
+    Message popMessageOfType(int[] types)
+    {
+        foreach(int i; types)
+        {
+            Message msg = popMessageOfType(i);
+            if (msg)
+                return msg;
+        }
+        return null;
+    }
+
+    Message popMessage()
+    {
+        Messages m;
+        int key;
+        auto itor = box.iterator;
 
-alias CircularSeq!(Message) Mailbox;
+        do
+        {
+            if (itor.valid && itor.next(key, m))
+            {
+                if (!m.isEmpty())
+                {
+                    Message msg = m.removeHead();
+                    if (msg)
+                        msg_count.store(msg_count.load()-1);
+                    if (m.isEmpty())
+                        box.removeKey(key);
+                    return msg;
+                }
+                else 
+                {
+                    iterator.remove();
+                }
+            }
+            else
+                return null;
+        }
+        while (true)
+    }
 
+    void push(Message msg)
+    {
+        Messages m;
+        if (box.get(msg.type, m))
+            m.append(msg);
+        else
+        {
+            m = new Messages;
+            m.append(msg);
+            box.add(msg.type, m);
+        }
+        msg_count.store(msg_count.load()+1);
+    }
+
+    int count()
+    {
+        return msg_count.load();
+    }
+private
+    HashMap!(int, Messages) box;
+    Atomic!(int) msg_count;
+}
+
+alias void delegate (Message) TaskDg;
 class Task
 {
 private
     Fiber fiber;
     Mailbox mailbox;
+    Mailbox lockedMailbox;
     int id;
     Vat vat;
-    dispatcher[Conduit] dispatchers;
-    
+    TaskDG taskdg;
+    IProvider provider;
+
 public
-    this() 
+    this(TaskDg tdg = null, IProvider provider = null) 
     {
         fiber = new Fiber(&run);
         mailbox = new Mailbox;
+        lockedMailbox = new Mailbox;
+        taskdg = tdg;
+        if (!provider)
+            provider = new DefaultProvider;
     }
 
     void setId(int i)
@@ -28,9 +126,14 @@
         id = i;
     }
 
-    Mailbox getMailbox() 
+    void  appendMessage(Message m) 
     { 
-        return mailbox; 
+        mailbox.push(m); 
+    }
+
+    synchronized void appendIVMessage(Message m)
+    {
+        lockedMailbox.push(m);
     }
 
     void setVat(Vat v)
@@ -38,7 +141,47 @@
         vat = v;
     }
 
-    abstract void run();
+    IProvider getProvider()
+    {
+        return provider;
+    }
+
+    void run()
+    in
+    {
+        assert(taskdg !is null);
+    }
+    body
+    {
+        while (msg = receive())
+        {
+            taskdg(msg);
+        }
+    }
+
+    /***************************************************************************
+        sendTo
+        Basic message passing utility for inter-task communication. 
+        It first checks the local Vat to see if the task is present, if not
+        it gets the task from the global registry and sends a message to its 
+        thread-safe mailbox. 
+    ****************************************************************************/
+ 
+    bool sendTo(int taskid, Message m)
+    {
+        Task t;
+        if (t = vat.getTask(taskid))
+        {
+            t.appendMessage(m);
+            return true;
+        }
+        else if (t = Vat.getGlobalTask(taskid))
+        {
+            t.appendIVMessage(m);
+            return true;
+        }
+        return false;
+    }
 
 protected
 
@@ -46,39 +189,38 @@
         receive
         User-called function to get the next pending message in the mailbox. 
         If there are no pending messages, this will yield control back to 
-        the scheduler/vat. 
+        the vat's scheduler. 
     ***************************************************************************/
 
-    Message receive() 
+    Message receive(int[] types) 
     {
-        Message m = mailbox.head();
-        mailbox.removeHead();
-        return m;
+        while(true)
+        {
+            Message m = mailbox.popMessageOfType(types);
+            if (!m)
+                Fiber.yield();
+            else if (SYSTEM_QUIT == m.type)
+                break;
+            else return m;
+            
+        }
+        return null;
+    }
+
+    Message receive()
+    {
+        while(true)
+        {
+            Message m = mailbox.popMessage();
+            if (!m)
+                Fiber.yield();
+            else if (SYSTEM_QUIT == m.type)
+                break;
+            else return m;
+        }
+        return null;
     }
 
     int getId() { return id;}
     
-    /**************************************************************************
-    
-        send
-        User-called function to send data to the counterpart at the other
-        end of the connection. This sets up a dispatcher to send
-        data as the conduit becomes free. 
-
-    **************************************************************************/
-    int send(char[] outbuf, Conduit c)
-    {
-        Dispatcher dis;
-        if ( ! (dis = (c in dispatchers)))
-            dis = new Dispatcher(c);
-
-        if (dis.appendOutBuffer(outbuf))
-        {
-            if (!vat.addConnection(dis))
-            {
-                log.error("unable to register mgr");
-            }
-        }
-        return 0;
-    } 
 }
--- a/dreactor/core/Vat.d	Tue Aug 12 16:59:56 2008 -0400
+++ b/dreactor/core/Vat.d	Wed Aug 27 00:47:33 2008 -0400
@@ -21,31 +21,23 @@
 import tango.util.log.Log;
 
 import dreactor.transport.AsyncSocketConduit;
+import dreactor.protocol.IProvider;
 import dreactor.core.Task;
 import dreactor.util.ThreadSafeQueue;
 
 static char[] version_string = "Vat.d 0.1 2008-05-31";
 
-Logger log;
 
 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
-alias Message delegate (Conduit c) HandlerDG;
-alias Message function (Conduit c) HandlerFN;
 
 class TaskAttachment
 {
 public
     Task task;
-    HandlerDG dg;
-    HandlerFN fn;
+    IProvider provider;
 
-    this(Task ta, HandlerDG d) 
-    { TaskAttachment t; t.task = ta; t.dg = d; return t; }
-    
-    this(Task ta, HandlerFN f) 
-    { TaskAttachment t; t.task = ta; t.fn = f; return t; }
-
-    public Message opCall(Conduit c) { dg is null ? return fn() : return dg(c); }
+    this (Task ta, IProvider p) 
+    { task = ta; provider = p; }
 }
 
 class Vat
@@ -53,18 +45,16 @@
 private
     Thread thread;
     bool running;
+    Logger log;
  
-    Task[int] tasks;
-    int taskCount;
+    TaskAttachment[int] tasks; //registry for local tasks
+
+    Selector selector;
+    static Atomic!(int) taskCount;
+    TaskAttachment[int] globalTasks; //global registry of tasks
 
 public 
 
-    this(Task t)
-    {
-        addTask(t);
-        this();
-    }
-
     this()
     {
         log = Log.lookup("dreactor.core.Vat");
@@ -73,13 +63,17 @@
         thread = new Thread(&eventLoop);
         thread.start();
     }
-    
-    void addTask(Task t)
+     
+    synchronized int addTask(Task t, IProvider p = null)
     {
         t.setVat(this);
         ++taskCount;
-        tasks[taskCount] = t;
+        auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider);
+        tasks[taskCount] = ta;
+        globalTask[taskCount] = ta;
+        selector.register(p.getConduit(), p.getEvents(), ta);
         t.setId(taskCount);
+        return taskCount;
     }
  
     void exit()
@@ -92,21 +86,43 @@
         thread.join();
     }
 
-    bool addConnection()
+    bool addConnection(int tid, Conduit c, Events evts)
     {
         log.trace("adding handler");
-        return selector.register(h.transport, h.events(), h);
+        TaskAttachment ta;
+        if (ta = (tid in tasks))
+            return selector.register(c, evts, ta);
+        else
+            return false;
     }
      
-    bool remConnection(Dispatcher handler)
+    bool remConnection(Conduit c)
+    {
+        return selector.unregister(c);
+    }
+
+    Task getTask(int tid)
     {
-        return selector.unregister(h.transport);
+        TaskAttachment ta;
+        if (ta = (tid in tasks))
+            return ta.task;
+        else
+            return null;
+    }
+
+    static synchronized Task getGlobalTask(int tid)
+    {
+        TaskAttachment ta;
+        if (ta = (tid in globaltasks))
+            return ta.task;
+        else
+            return null;
     }
 
 private
     void eventLoop()
     {
-        auto selector = new Selector();
+        selector = new Selector();
         selector.open();
         do
         {
@@ -122,30 +138,31 @@
                     {
                         // incoming data
                         log.trace("Read event fired");    
-                        auto conn = cast(Dispatcher) key.attachment;
-                        if ( Dispatcher.State.listening == conn.getState() )
-                            conn.handleConnection(conn.transport, &addConnection);
-                        else
-                            processReturn(conn.handleIncoming(), selector, conn);
+                        auto ta = cast(TaskAttachment) key.attachment;
+                        processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit);
+
                     }
                     else if (key.isWritable())
                     {
                         log.trace("Write event fired");    
-                        auto conn = cast(Dispatcher) key.attachment;
-                        processReturn(conn.handleOutgoing(), selector, conn);
+                        auto ta = cast(TaskAttachment) key.attachment;
+                        ta.appendMessage(ta.provider.handleWrite());
+                        processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit);
                     }
                     else if (key.isHangup())
                     {
                         log.trace("Hangup event fired");
-                        auto conn = cast(Dispatcher) key.attachment;
-                        processReturn(conn.handleDisconnect(), selector, conn);
+                        auto ta = cast(TaskAttachment) key.attachment;
+                        ta.appendMessage(ta.provider.handleDisconnect());
+                        processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit);
                     }
                     else if (key.isError() || key.isInvalidHandle())
                     {
-                        log.trace("Error event fired");    
+                        log.trace("Error event fired"); 
                         // error, close connection
-                        auto conn = cast(Dispatcher) key.attachment;
-                        conn.handleError(&remConnection);
+                        auto ta = cast(TaskAttachment) key.attachment;
+                        ta.appendMessage(ta.provider.handleError());
+                        processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit);
                     }
                 }
             }
@@ -173,25 +190,23 @@
         }        
     }
 
-    void processReturn(int result, Selector s, Dispatcher h)
+    void processReturn(int result, Conduit c)
     {
         switch(result)
         {
             case CLOSE:
-                s.unregister(h.transport);
-                h.transport.detach();
+                selector.unregister(c);
+                c.detach();
             break;
             case UNREGISTER:
-                s.unregister(h.transport);
+                selector.unregister(c);
             break;
             case REMAIN:
                 //this space intentially left blank
             break;
             case REGISTER:
-                s.register(h.transport, h.events(), h);
             break;
             case REREGISTER:
-                s.register(h.transport, h.events(), h);
             break;
             default:
                 log.error("processReturn: unknown return value");
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/protocol/DefaultProvider.d	Wed Aug 27 00:47:33 2008 -0400
@@ -0,0 +1,51 @@
+module dreactor.protocol.DefaultProvider;
+
+import tango.io.Selector;
+
+import dreactor.protocol.IProvider;
+
+
+
+class DefaultProvider : IProvider
+{
+private
+    Conduit cond;
+    Events evts;
+
+public
+    Message handleRead(Conduit c)
+    {
+    }
+
+    Message handleWrite(Conduit c)
+    {
+    }
+
+    Message handleError(Conduit c)
+    {
+    }
+
+    Message handleConnect(Conduit c)
+    {
+    }
+
+    Message handleDisconnect(Conduit c)
+    {
+    }
+
+
+    Conduit getConduit()
+    {
+        return cond;
+    }
+
+    int getEvents()
+    {
+        return evts;
+    }
+
+    void setEvents(Event e)
+    {
+        evts e;
+    }
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/protocol/Emitter.d	Wed Aug 27 00:47:33 2008 -0400
@@ -0,0 +1,48 @@
+module Emitter
+
+
+
+
+import tango.core.Thread;
+
+import dreactor.core.Task;
+
+alias Message delegate(void) EmitterDg;
+
+class Emitter
+{
+public
+    this(Task t, EmitterDg cb)
+    {
+        task = t;
+        callback = cb;
+        thread = new Thread(&run);
+        thread.start();
+    }
+
+    void stop()
+    {
+        running = false;
+    }
+    
+    void stopNow()
+    {
+        thread.isDaemon(true);
+        running = false;
+    }
+private
+
+    void run()
+    {
+        while(running)
+        {
+            Message msg = callback();
+            task.appendIVMessage(msg);
+        }
+    }
+    Task task;
+    Thread thread;
+    bool running;
+    EmitterCb callback;
+}
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/protocol/IProvider.d	Wed Aug 27 00:47:33 2008 -0400
@@ -0,0 +1,29 @@
+module dreactor.protocol.IProvider;
+
+class Message
+{
+public
+    int type;
+    int info;
+    Object payload;
+    this (Object buf, int t, int e) 
+    {
+        type = t; 
+        info = e; 
+        payload = buf; 
+    }
+}
+
+interface IProvider
+{
+    Message handleRead(Conduit c);
+    Message handleWrite(Conduit c);
+    Message handleError(Conduit c);
+    Message handleConnect(Conduit c);
+    Message handleDisconnect(Conduit c);
+    abstract void send(char []);
+
+    Conduit getConduit();
+    int getEvents();
+    void setEvents();
+}
--- a/dreactor/protocol/Protocol.d	Tue Aug 12 16:59:56 2008 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,26 +0,0 @@
-module dreactor.protocol.IProtocol;
-
-struct Message
-{
-public
-    int type;
-    int errorcode;
-    Object payload;
-    static Message opCall(Object buf, int t, int e) 
-    {
-        Message m; 
-        m.type = t; 
-        errorcode = e; 
-        m.payload = buf; 
-        return m;
-    }
-}
-
-interface IProtocol
-{
-   Message handleRead(Conduit c);
-   Message handleWrite(Conduit c);
-   Message handleError(Conduit c);
-   Message handleConnect(Conduit c);
-   Message handleDisconnect(Conduit c);
-}
--- a/dreactor/protocol/RawTcp.d	Tue Aug 12 16:59:56 2008 -0400
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,333 +0,0 @@
-module dreactor.protocol.RawTcp;
-
-import tango.io.Conduit;
-import tango.io.selector.model.ISelector;
-import tango.net.Socket;
-import tango.util.collection.CircularSeq;
-import tango.util.log.Log;
-import tango.util.log.Config;
-
-import dreactor.transport.AsyncSocketConduit;
-import dreactor.core.Vat;
-import dreactor.core.Dispatcher;
-
-/******************************************************************************
-    
-    Basic TCP server or client routines for sending raw data.
-
-******************************************************************************/
-class RawTCPListener
-{
-public
-
-    this(AsyncSocketConduit cond)
-    {
-        log = Log.lookup("dreactor.protocol.RawTcpServer");
-        log.info("log initialized");
-        children = new CircularSeq!(Dispatcher);
-    }
-
-    this(Vat sel, IPv4Address addr)
-    {
-        AsyncSocketConduit cond = new AsyncSocketConduit;
-        cond.socket().setAddressReuse(true);
-        this(cond);
-    }
-
-    ~this()
-    {
-        close();
-    } 
-
-    AsyncSocketConduit accept(Conduit cond, RegisterD reg)
-    {
-        AsyncSocketConduit newcond = new AsyncSocketConduit;
-        (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket);
-        h.events(Event.Read);
-        vat.addConnection(h);
-        children.append(h);
-        log.info("accepted new connection");
-        return newcond;
-    }
-
-    int broadcast(char[] outbuf, AsyncSocketConduit[] recips)
-    {
-        foreach(AsyncSocketConduit c; recips)
-        {
-            if (c.appendOutBuffer(outbuf))
-            {
-                h.addEvent(Event.Write);
-                vat.addConnection(h);
-            }
-        }
-        return 0;
-    }
-    
-    /**************************************************************************
-    
-        send
-        User-called function to send data to the counterpart at the other
-        end of the connection. This sets up the connection manager to send
-        data as the socket becomes free. 
-
-    **************************************************************************/
-    int send(Dispatcher d, char[] outbuf, IPv4Address addr = null)
-    {
-        if (d.appendOutBuffer(outbuf))
-        {
-            d.addEvent(Event.Write);
-            if (!vat.addConnection(d))
-            {
-                log.error("unable to register mgr");
-            }
-        }
-        return 0;
-    }
-
-    /**************************************************************************
-
-        receive
-        IncomingHandlerD
-        Default incoming data handler. Should be replaced with something useful.
-
-    **************************************************************************/ 
-    int onReceive(Dispatcher h)
-    {
-        Logger log = Log.lookup("Handlers.onReceive");
-
-        char inbuf[8192];
-        int amt;
-        if((amt = h.transport.read(inbuf)) > 0)
-        {
-            if (dataHandler)
-                dataHandler(inbuf[0 .. amt], h);
-            else
-                log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
-        }
-        else
-        {
-            if (amt == 0)
-            {
-                children.remove(h);
-                (cast(AsyncSocketConduit) h.transport).shutdown();
-                return CLOSE;
-            }
-            log.error("Received no data, err = {}", amt);
-        }
-        return REMAIN;
-    }
- 
-    void close()
-    {
-        foreach(Dispatcher d; children)
-        {
-            (cast(AsyncSocketConduit)d.transport).shutdown();
-            (cast(AsyncSocketConduit)d.transport).detach();
-        }
-        (cast(AsyncSocketConduit)manager.transport).shutdown();
-        (cast(AsyncSocketConduit)manager.transport).detach(); 
-        
-    }
-    
-    void setDataHandler(void delegate(char[], Dispatcher) h)
-    {
-        dataHandler = h;
-    }
-
-private
-    Vat vat;
-    CircularSeq!(Dispatcher) children;
-    Dispatcher manager;
-    Logger log;
-    RawTCPHandler h;
-    void delegate(char[], Dispatcher) dataHandler;
-}
-
-class RawTCPClient
-{
-
-public
-    this(Dispatcher mgr, Vat sel, Event evts = Event.Read)
-    {
-        manager = mgr;
-        manager.events(evts);
-        connected = false;
-        mgr.setOutgoingHandler(&RawTCPHandler.onSend);
-        mgr.setIncomingHandler(&onReceive);
-        mgr.setErrorHandler(&RawTCPHandler.onError);
-        mgr.setDisconnectHandler(&RawTCPHandler.onHangup);
-        vat = sel;
-        log = Log.lookup("dreactor.protocol.RawTcpClient");
-    }
-
-    this(Vat sel, Event evts = Event.Read)
-    {
-        AsyncSocketConduit clcond = new AsyncSocketConduit;
-        Dispatcher ch = new Dispatcher(clcond);
-        this(ch, sel, evts);
-    }
-
-    ~this()
-    {
-        (cast(AsyncSocketConduit)manager.transport).shutdown();
-        (cast(AsyncSocketConduit)manager.transport).detach();
-    }
-
-    int connect(IPv4Address addr)
-    {
-        (cast(AsyncSocketConduit) manager.transport()).connect(addr);
-        vat.addConnection(manager);
-        connected = true;
-        log.info("connected to {}", addr);
-        return 0;
-    } 
-
-    /**************************************************************************
-    
-        send
-        User-called function to send data to the counterpart at the other
-        end of the connection. This sets up the connection manager to send
-        data as the socket becomes free. 
-
-    **************************************************************************/
-    int send(char[] outbuf, IPv4Address addr = null)
-    {
-        if (!connected)
-        {
-            log.info("send: not connected, connecting");
-            if (addr !is null)
-            {
-                if (0 > connect(addr))
-                {
-                    log.error("send: unable to connect");
-                    return -1;
-                }
-            }
-        }
-        if (manager.appendOutBuffer(outbuf))
-        {
-            manager.addEvent(Event.Write);
-            if (!vat.addConnection(manager))
-            {
-                log.error("unable to register mgr");
-            }
-        }
-        return 0;
-    }
-    
-    /**************************************************************************
-
-        receive
-        IncomingHandlerD
-        Default incoming data handler. Should be replaced with something useful.
-
-    **************************************************************************/ 
-    public int onReceive(Dispatcher h)
-    {
-        Logger log = Log.lookup("Handlers.onReceive");
-
-        char inbuf[8192];
-        int amt;
-        if((amt = h.transport.read(inbuf)) > 0)
-        {
-            if (dataHandler)
-                dataHandler(inbuf[0 .. amt], h);
-            else
-                log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
-        }
-        else
-        {
-            if (amt == 0)
-            {
-                return CLOSE;
-            }
-            log.error("Received no data, err = {}", amt);
-        }
-        return REMAIN;
-    }
-    
-    void setDataHandler(void delegate(char[], Dispatcher) h)
-    {
-        dataHandler = h;
-    }
-
-private
-    void delegate(char[], Dispatcher) dataHandler;
-    Dispatcher manager;
-    Vat vat;
-    bool connected;
-    Logger log;
-    RawTCPHandler h;
-}
-
-
-/******************************************************************************
-
-    Default Event handlers common to both listener/clients
-
-******************************************************************************/
-struct RawTCPHandler
-{
-    /**************************************************************************
-
-        onSend
-        OutgoingHandlerD 
-        To be registered as the response to socket writable event. 
-        Sends data, returns amount sent. Unregisters Handler for sending
-        if there is no more data left to send. 
-
-    ***************************************************************************/
-    public static int onSend(Dispatcher h)
-    {
-        Logger log = Log.lookup("Handlers.onSend");
-     
-        char[] outbuf = h.nextBuffer();
-        if (outbuf !is null)
-        {
-            int sent = h.transport.write(outbuf);
-            if (sent > 0)
-            {
-                if (! h.addOffset(sent))
-                {
-                    h.remEvent(Event.Write);
-                    return REREGISTER;
-                }
-            }
-            else if (sent == AsyncSocketConduit.Eof)
-            {
-                log.error("Select said socket was writable, but sent 0 bytes");
-            }
-            else
-            {
-               log.error("Socket send return ERR");
-            }
-            return REMAIN;
-        }
-        else
-        {
-            h.remEvent(Event.Write);
-            return REREGISTER;
-        }
-    }
-
-    static int onHangup(Dispatcher d)
-    {
-        return UNREGISTER;
-    }
-
-    static int onError(Dispatcher d, RegisterD unreg)
-    {
-        return CLOSE;
-    } 
-
-}
-
-bool includes(Dispatcher[] haystack, Dispatcher needle)
-{
-    foreach(Dispatcher h; haystack)
-    {
-        if (h is needle)
-            return true;
-    }
-    return false;
-}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/protocol/TcpProvider.d	Wed Aug 27 00:47:33 2008 -0400
@@ -0,0 +1,252 @@
+module dreactor.protocol.RawTcp;
+
+import tango.io.device.Conduit;
+import tango.io.selector.model.ISelector;
+import tango.net.Socket;
+import tango.util.collection.CircularSeq;
+import tango.util.log.Log;
+import tango.util.log.Config;
+
+import dreactor.transport.AsyncSocketConduit;
+import dreactor.core.Vat;
+import dreactor.core.Dispatcher;
+
+/******************************************************************************
+    
+    Basic TCP server or client routines for sending raw data.
+
+******************************************************************************/
+class TCPProvider : IProvider
+{
+public
+
+    enum 
+    {
+        RECEIVE = 1000,
+        SEND_COMPLETE,
+        NEW_CONNECTION,
+        REMOTE_CLOSED,
+        SEND_ERROR,
+        RECEIVE_ERROR,
+        ERROR
+    }
+
+    this(AsyncSocketConduit c)
+    {
+        log = Log.lookup("dreactor.protocol.RawTcpServer");
+        log.info("log initialized");
+        cond = c;
+    }
+
+    this(Vat v, IPv4Address addr)
+    {
+        AsyncSocketConduit cond = new AsyncSocketConduit;
+        cond.socket().setAddressReuse(true);
+        this(cond);
+    }
+
+    ~this()
+    {
+        close();
+    } 
+    
+    Message handleRead(Conduit c)
+    {
+        Logger log = Log.lookup("Handlers.onReceive");
+
+        char inbuf[8192];
+        int amt;
+        if((amt = h.transport.read(inbuf)) > 0)
+        {
+            return new Message(inbuf[0 .. amt].dup, RECEIVE, amt);
+        }
+        else
+        {
+            if (amt == 0)
+            {
+                children.remove(h);
+                (cast(AsyncSocketConduit) h.transport).shutdown();
+                return Message(null, REMOTE_CLOSED, amt);
+            }
+            log.error("Received no data, err = {}", amt);
+        }
+        return new Message(null, ERROR, amt);
+    }
+    
+    /**************************************************************************
+
+        handleWrite
+        To be registered as the response to socket writable event. 
+        Sends data, returns amount sent. Unregisters Handler for sending
+        if there is no more data left to send. 
+
+    ***************************************************************************/
+    Message handleWrite(Conduit c)
+    {
+        Logger log = Log.lookup("Handlers.onSend");
+     
+        char[] outbuf = nextBuffer();
+        if (outbuf !is null)
+        {
+            int sent = cond.write(outbuf);
+            if (sent > 0)
+            {
+                if (! addOffset(sent))
+                {
+                    //h.remEvent(Event.Write);
+                    //TODO - How do we handle event re-registering
+                    return new Message(null, SEND_COMPLETE, sent);
+                }
+            }
+            else if (sent == 0)
+            {
+                log.error("Select said socket was writable, but sent 0 bytes");
+                return new Message(null, SEND_ERROR, 0);
+            }
+            else
+            {
+                log.error("Socket send return ERR");
+                return new Message(null, SEND_ERROR, sent);
+            }
+        }
+        else
+        {
+            //h.remEvent(Event.Write);
+            //TODO - How do we handle event re-registering
+        
+            return new Message(null, SEND_COMPLETE, 0);
+        }
+    }
+
+    Message handleDisconnect(Conduit c)
+    {
+        return new Message(c, REMOTE_CLOSED, 0);
+    }
+
+    Message handleError(Conduit c)
+    {
+        return new Messsage(null, ERROR, 0);
+    } 
+
+    Message handleConnect(Conduit c)
+    {
+        return new Message(accept(), NEW_CONNECTION, 0);
+    }
+
+    Conduit getConduit()
+    {
+        return cond;
+    }
+
+    int getEvents()
+    {
+        return events;
+    }
+
+    void setEvents(Event e)
+    {
+        events = e;
+    }
+
+    AsyncSocketConduit accept()
+    {
+        AsyncSocketConduit newcond = new AsyncSocketConduit;
+        cond.socket().accept(newcond.socket);
+        log.info("accepted new connection");
+        return newcond;
+    }
+
+    int broadcast(char[] outbuf, TCPProvider[] recips)
+    {
+        foreach(TCPProvider c; recips)
+        {
+            if (c.appendOutBuffer(outbuf))
+            {
+                h.addEvent(Event.Write);
+                vat.addConnection(h);
+            }
+        }
+        return 0;
+    }
+    
+    /**************************************************************************
+    
+        send
+        User-called function to send data to the counterpart at the other
+        end of the connection. This sets up the connection manager to send
+        data as the socket becomes free. 
+
+    **************************************************************************/
+    int send(char[] outbufl)
+    {
+        if (appendOutBuffer(outbuf))
+        {
+            //TODO - should we always register for all events? or update it when needed?
+            //d.addEvent(Event.Write);
+            if (!vat.addConnection(d))
+            {
+                log.error("unable to register mgr");
+            }
+        }
+        return 0;
+    }
+
+ 
+    void close()
+    {
+        cond.shutdown();
+        cond.detach(); 
+    }
+    
+
+    ~this()
+    {
+        (cast(AsyncSocketConduit)manager.transport).shutdown();
+        (cast(AsyncSocketConduit)manager.transport).detach();
+    }
+
+    int connect(IPv4Address addr)
+    {
+        cond = new AsyncSocketConduit;
+        cond.socket().setAddressReuse(true);
+        
+        cond.connect(addr);
+        connected = true;
+        log.info("connected to {}", addr);
+        return 0;
+    } 
+
+    /**************************************************************************
+    
+        send
+        User-called function to send data to the counterpart at the other
+        end of the connection. This sets up the connection manager to send
+        data as the socket becomes free. 
+
+    **************************************************************************/
+    int send(char[] outbuf, IPv4Address addr = null)
+    {
+        if (!connected)
+        {
+            log.info("send: not connected, connecting");
+            return -1;
+        }
+        if (appendOutBuffer(outbuf))
+        {
+            addEvent(Event.Write);
+            if (!vat.addConnection(manager))
+            {
+                log.error("unable to register mgr");
+            }
+        }
+        return 0;
+    }
+    
+    
+private
+    Vat vat;
+    Conduit cond;
+    Logger log;
+    bool listener;
+    Event events;
+}
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/dreactor/protocol/UdpProvider.d	Wed Aug 27 00:47:33 2008 -0400
@@ -0,0 +1,133 @@
+module dreactor.protocol.Raw;
+
+import tango.io.Conduit;
+import tango.io.selector.model.ISelector;
+import dreactor.core.AsyncConduit;
+import dreactor.core.SelectLoop;
+import dreactor.core.ConnectionHandler;
+import tango.util.collection.CircularSeq;
+import tango.util.log.Log;
+import tango.util.log.Configurator;
+
+Logger log = Log.getLogger("dreactor.core.SelectLoop");
+
+/******************************************************************************
+    
+    Basic TCP server or client routines for sending raw data.
+
+******************************************************************************/
+class RawListener
+{
+public
+ 
+    this(ConnectionHandler mgr, SelectLoop sel)
+    {
+        manager = mgr;
+        mgr.events(Event.Read);
+        sel.addConnection(mgr);
+        select = sel;
+        children = CircularSeq!(ConnectionHandler);
+        Configurator();
+    }
+ 
+    int accept(Conduit cond)
+    {
+        AsyncConduit newcond = new AsyncConduit;
+        cond.socket().accept(newcond.socket);
+        ConnectionHandler h = ConnectionHandler.New(manager);
+        mgr.events(Event.Read);
+        select.addConnection(mgr);
+        children.append(mgr);
+    }
+
+    bool broadcast(char[] outbuf)
+    {
+        foreach(ConnectionHandler h; children)
+        {
+            if (h.appendBuffer(outbuf))
+            {
+                h.addEvent(Event.Write);
+                select.addConnection(h);
+            }
+        }
+    }
+ 
+    void close()
+    {
+        
+    }
+
+    /**************************************************************************
+
+        send
+        OutgoingHandlerD 
+        To be registered as the response to socket writable event. 
+        Sends data, returns amount sent. Unregisters Handler for sending
+        if there is no more data left to send. 
+
+    ***************************************************************************/
+    int send(ConnectionHandler h, RegisterD reg)
+    {
+        char[] outbuf = h.nextBuffer();
+        if (!outbuf is null)
+        {
+            int sent = h.transport.write(outbuf);
+            if (sent > 0)
+            {
+                if (! h.addOffset(sent))
+                {
+                    h.removeEvent(Event.write);
+                    reg(h);
+                }
+            }
+            else if (sent == EOF)
+            {
+                // EAGAIN ? probably shouldn't have happened. 
+            }
+            else
+            {
+               log.error("Socket send return ERR"); 
+            }
+            return sent;
+        }
+        return 0;
+    }
+
+    /**************************************************************************
+    
+        receive
+        IncomingHandlerD
+        Default incoming data handler. Should be replaced with something useful.
+
+    **************************************************************************/ 
+    int receive(ConnectionHandler h, RegisterD reg)
+    {
+        char inbuf[8192];
+        auto format = Log.format;
+        if(h.transport.read(inbuf) > 0)
+            log.info(format("Received Buffer: {}", inbuf));
+    }
+
+private
+    ConnectionHandler manager;
+    SelectLoop select;
+    CircularSeq!(ConnectionHandler) children;
+}
+
+class RawClient
+{
+public
+    this(ConnectionHandler mgr, SelectLoop sel)
+    {
+        manager = mgr;
+        mgr.events(Event.Read);
+        sel.addConnection(mgr);
+        select = sel;
+    }
+
+    
+
+private
+    ConnectionHandler manager;
+    SelectLoop select;
+}
--- a/dreactor/transport/AsyncSocketConduit.d	Tue Aug 12 16:59:56 2008 -0400
+++ b/dreactor/transport/AsyncSocketConduit.d	Wed Aug 27 00:47:33 2008 -0400
@@ -16,7 +16,7 @@
 
 private import  tango.time.Time;
 
-public  import  tango.io.Conduit;
+public  import  tango.io.device.Conduit;
 
 private import  tango.net.Socket;
 
--- a/dsss.conf	Tue Aug 12 16:59:56 2008 -0400
+++ b/dsss.conf	Wed Aug 27 00:47:33 2008 -0400
@@ -1,4 +1,4 @@
-[test/test.d]
-[test/longtest.d]
-[test/chatserver.d]
+#[test/test.d]
+#[test/longtest.d]
+#[test/chatserver.d]
 [test/chatclient.d]
--- a/dsss.last	Tue Aug 12 16:59:56 2008 -0400
+++ b/dsss.last	Wed Aug 27 00:47:33 2008 -0400
@@ -1,5 +1,4 @@
-[test/test.d]
-[test/longtest.d]
-[test/chatserver.d]
+#[test/test.d]
+#[test/longtest.d]
+#[test/chatserver.d]
 [test/chatclient.d]
-[dreactor/protocol/http11_parser.d]
Binary file test/async/chatclient has changed
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/async/chatclient.d	Wed Aug 27 00:47:33 2008 -0400
@@ -0,0 +1,39 @@
+module chatclient;
+
+import tango.net.Socket;
+import tango.core.Thread;
+import tango.io.Stdout;
+import tango.io.Console;
+import tango.util.log.Log;
+
+import dreactor.core.Vat; 
+import dreactor.core.Dispatcher;
+
+import dreactor.protocol.RawTcp;
+import dreactor.transport.AsyncSocketConduit;
+
+int main()
+{ 
+    Vat c_vat = new Vat();
+    RawTCPClient client = new RawTCPClient(c_vat);
+    Log.root.level(log.Level.Warn, true); 
+
+    client.setDataHandler( (char[] inbuf, Dispatcher d) {
+        Stdout(inbuf)();
+    });
+ 
+    c_vat.run();
+    client.connect(new IPv4Address("localhost", 5555));
+    
+    while(true)
+    {
+        char buf[] = Cin.copyln(true);
+        if (buf == "quit\n")
+            break;
+        client.send(buf);             
+    }
+    c_vat.exit();
+    delete client;
+    return 0;
+}
+
Binary file test/async/chatserver has changed
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/async/chatserver.d	Wed Aug 27 00:47:33 2008 -0400
@@ -0,0 +1,29 @@
+module chatserver;
+
+import tango.net.Socket;
+import tango.core.Thread;
+import tango.io.Stdout;
+import tango.util.log.Log;
+import dreactor.core.Vat; 
+import dreactor.core.Dispatcher;
+
+import dreactor.protocol.RawTcp;
+import dreactor.transport.AsyncSocketConduit;
+
+int main()
+{ 
+    Vat l_vat = new Vat();
+    Logger log = Log.lookup("dreactor.chatserver");
+    Log.root.level(log.Level.Info, true); 
+    RawTCPListener listener = new RawTCPListener(l_vat, new IPv4Address(5555));
+
+    listener.setDataHandler( (char[] inbuf, Dispatcher d) {
+    
+        listener.broadcast(inbuf, [d]);
+
+    });
+    l_vat.run();
+
+    return 0;
+}
+
Binary file test/async/longtest has changed
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/async/longtest.d	Wed Aug 27 00:47:33 2008 -0400
@@ -0,0 +1,273 @@
+module longtest;
+
+import tango.net.Socket;
+import tango.core.Thread;
+import tango.io.Stdout;
+import dreactor.core.Vat; 
+import dreactor.core.Dispatcher;
+import dreactor.protocol.RawTcp;
+import dreactor.transport.AsyncSocketConduit;
+
+
+int main()
+{ 
+    AsyncSocketConduit cond = new AsyncSocketConduit;
+    Dispatcher lh = new Dispatcher(cond, true);
+    Vat l_vat = new Vat();
+    RawTCPListener listener = new RawTCPListener(lh, l_vat, new IPv4Address(5555)); 
+    l_vat.run();
+
+    AsyncSocketConduit clcond = new AsyncSocketConduit;
+    Dispatcher ch = new Dispatcher(clcond);
+    Vat c_vat = new Vat();
+    RawTCPClient client = new RawTCPClient(ch, c_vat);
+    c_vat.run(); //run, vat, run!
+
+    client.connect(new IPv4Address("localhost", 5555));
+    //Thread.sleep(1); 
+    client.send(testbuffer);
+    return 0;
+}
+
+char testbuffer[] = 
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000"
+        "00000000000000000000000000000000000000000000000000";
+
Binary file test/async/test has changed
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/test/async/test.d	Wed Aug 27 00:47:33 2008 -0400
@@ -0,0 +1,30 @@
+module test;
+
+import tango.net.Socket;
+import tango.core.Thread;
+import tango.io.Stdout;
+import dreactor.core.Vat; 
+import dreactor.core.Dispatcher;
+import dreactor.protocol.RawTcp;
+import dreactor.transport.AsyncSocketConduit;
+
+int main()
+{ 
+    AsyncSocketConduit cond = new AsyncSocketConduit;
+    Dispatcher lh = new Dispatcher(cond, true);
+    Vat l_vat = new Vat();
+    RawTCPListener listener = new RawTCPListener(lh, l_vat, new IPv4Address(5555)); 
+    l_vat.run();
+
+    AsyncSocketConduit clcond = new AsyncSocketConduit;
+    Dispatcher ch = new Dispatcher(clcond);
+    Vat c_vat = new Vat();
+    RawTCPClient client = new RawTCPClient(ch, c_vat);
+    c_vat.run();
+
+    client.connect(new IPv4Address("localhost", 5555));
+    //Thread.sleep(1); 
+    client.send("This is a test");
+    return 0;
+}
+
--- a/test/chatclient.d	Tue Aug 12 16:59:56 2008 -0400
+++ b/test/chatclient.d	Wed Aug 27 00:47:33 2008 -0400
@@ -1,39 +1,72 @@
+
 module chatclient;
 
-import tango.net.Socket;
-import tango.core.Thread;
-import tango.io.Stdout;
-import tango.io.Console;
-import tango.util.log.Log;
+import dreactor.core.Vat;
+import dreactor.core.Task;
+import dreactor.protocol.TcpProvider;
 
-import dreactor.core.Vat; 
-import dreactor.core.Dispatcher;
+enum {EMITTER_CHAT_RECEIVE = 42};
+
+class ChatTask : Task
+{
 
-import dreactor.protocol.RawTcp;
-import dreactor.transport.AsyncSocketConduit;
+private
+    TcpProvider client;
 
-int main()
-{ 
-    Vat c_vat = new Vat();
-    RawTCPClient client = new RawTCPClient(c_vat);
-    Log.root.level(log.Level.Warn, true); 
+public
+    this(TcpProvider tcpclient) 
+    {
+        super(tcpclient);
+    }
+
+    void run()
+    {
+        Message msg;
 
-    client.setDataHandler( (char[] inbuf, Dispatcher d) {
-        Stdout(inbuf)();
-    });
- 
-    c_vat.run();
-    client.connect(new IPv4Address("localhost", 5555));
-    
-    while(true)
-    {
-        char buf[] = Cin.copyln(true);
-        if (buf == "quit\n")
-            break;
-        client.send(buf);             
+        auto em = new Emitter(this, 
+                {
+                     char buf[] = Cin.copyln(true);
+                     return new Message(buf, EMITTER_STDIN_RECEIVE, buf.size);
+                });
+        
+        while (msg = receive())
+        {
+            switch(msg.type)
+            {
+                case EMITTER_CHAT_RECEIVE:
+                    char[] inbuf = msg.payload;
+                    if (inbuf == "quit")
+                    {
+                        em.stopNow();
+                        return;
+                    }
+                    client.send(msg.payload);
+                    break;
+
+                case TcpProvider.RECEIVE:
+                    Stdout(cast(char[]) msg.payload);
+                    break;
+
+                case TcpProvider.SEND_COMPLETE:
+                    break;
+
+                case TcpProvider.REMOTE_CLOSED:
+                    Stdout("--- Remote host closed connection \n");
+                    break;
+
+                default:
+                    Stdout("Unknown message received\n");
+            }
+        }
+        em.stopNow();
     }
-    c_vat.exit();
-    delete client;
-    return 0;
 }
 
+
+int main(int argc, char[][] argv)
+{
+    auto vat = new Vat;
+    auto client = new TcpProvider(new IPv4Address("localhost", 5555), vat);
+    auto tsk = new ChatTask(client);
+    vat.addTask(task);
+}
--- a/test/chatserver.d	Tue Aug 12 16:59:56 2008 -0400
+++ b/test/chatserver.d	Wed Aug 27 00:47:33 2008 -0400
@@ -1,29 +1,66 @@
+
 module chatserver;
 
-import tango.net.Socket;
-import tango.core.Thread;
-import tango.io.Stdout;
-import tango.util.log.Log;
-import dreactor.core.Vat; 
-import dreactor.core.Dispatcher;
+import dreactor.core.Vat;
+import dreactor.core.Task;
+import dreactor.protocol.TcpProvider;
 
-import dreactor.protocol.RawTcp;
-import dreactor.transport.AsyncSocketConduit;
+
+class ChatConnectionTask : Task
+{
+public
 
-int main()
-{ 
-    Vat l_vat = new Vat();
-    Logger log = Log.lookup("dreactor.chatserver");
-    Log.root.level(log.Level.Info, true); 
-    RawTCPListener listener = new RawTCPListener(l_vat, new IPv4Address(5555));
+    void run()
+    {
+        Message msg;
+        while (msg = receive())
+        {
+            switch(msg.type)
+            {
+                case TCP_PROVIDER_RECEIVE:
+                    //Stdout(cast(char[]) msg.payload);
+                    break;
+                case TCP_PROVIDER_SEND_COMPLETE:
+                    break;
+                case TCP_PROVIDER_REMOTE_CLOSED:
+                    Stdout("--- Remote host closed connection \n");
+                    break;
+                default:
+                    Stdout("Unknown message received\n");
+            }
+        }
+        em.stopNow();
+    }
 
-    listener.setDataHandler( (char[] inbuf, Dispatcher d) {
-    
-        listener.broadcast(inbuf, [d]);
+    void send(char[] buf)
+    {
+        tcp.send(buf);
+    }
 
-    });
-    l_vat.run();
-
-    return 0;
+    static CircularList!(ChatConnectionTask!(
 }
 
+
+int main(int argc, char[][] argv)
+{
+    auto vat = new Vat;
+ 
+    void listentask(Message msg)
+    {
+        switch(msg.type)
+        {
+            case TCP_PROVIDER_CONNECT:
+                AsyncSocketConduit cond = cast(AsyncSocketConduit) msg.payload;
+                auto tsk = ChatConnectionTask(new TcpProvider(cond));
+                vat.addTask(tsk);
+                break;
+            default:
+                Stdout("Unknown message received\n");
+        }
+    }
+
+
+    auto provider = new TcpProvider(new IPv4Address("localhost", 5555), vat);
+    auto srvtsk = new Task(&listentask, provider);
+    vat.addTask(task, client);
+}