diff dreactor/protocol/TcpProvider.d @ 13:8c9b1276f623 default tip

bug fixes
author rick@minifunk
date Sat, 20 Sep 2008 18:33:11 -0400
parents d6a3cfe7c3de
children
line wrap: on
line diff
--- a/dreactor/protocol/TcpProvider.d	Wed Aug 27 00:47:33 2008 -0400
+++ b/dreactor/protocol/TcpProvider.d	Sat Sep 20 18:33:11 2008 -0400
@@ -1,34 +1,31 @@
-module dreactor.protocol.RawTcp;
+module dreactor.protocol.TcpProvider;
 
 import tango.io.device.Conduit;
 import tango.io.selector.model.ISelector;
 import tango.net.Socket;
-import tango.util.collection.CircularSeq;
+import tango.util.container.CircularList;
 import tango.util.log.Log;
 import tango.util.log.Config;
 
 import dreactor.transport.AsyncSocketConduit;
 import dreactor.core.Vat;
-import dreactor.core.Dispatcher;
-
+public import dreactor.protocol.IProvider;
 /******************************************************************************
     
     Basic TCP server or client routines for sending raw data.
 
 ******************************************************************************/
-class TCPProvider : IProvider
+class TcpProvider : IProvider
 {
 public
-
-    enum 
-    {
-        RECEIVE = 1000,
-        SEND_COMPLETE,
-        NEW_CONNECTION,
-        REMOTE_CLOSED,
-        SEND_ERROR,
-        RECEIVE_ERROR,
-        ERROR
+    enum {
+        SendComplete = 2000,
+        NewConnection,
+        Receive,
+        RemoteClosed,
+        SendError,
+        ReceiveError,
+        Error
     }
 
     this(AsyncSocketConduit c)
@@ -36,41 +33,53 @@
         log = Log.lookup("dreactor.protocol.RawTcpServer");
         log.info("log initialized");
         cond = c;
+        events = Event.Read;
     }
 
-    this(Vat v, IPv4Address addr)
+    this(IPv4Address addr, bool listen = false)
     {
-        AsyncSocketConduit cond = new AsyncSocketConduit;
-        cond.socket().setAddressReuse(true);
-        this(cond);
+        AsyncSocketConduit c = new AsyncSocketConduit;
+        c.socket().setAddressReuse(true);
+        if (listen)
+        {
+            c.bind(addr);
+            c.socket().listen(1000);
+            listener = listen;
+        }
+        else
+            c.connect(addr);
+        this(c);
     }
 
+    
     ~this()
     {
         close();
     } 
     
-    Message handleRead(Conduit c)
+    Message handleRead()
     {
         Logger log = Log.lookup("Handlers.onReceive");
 
+        if (listener)
+            return handleConnect();
+
         char inbuf[8192];
         int amt;
-        if((amt = h.transport.read(inbuf)) > 0)
+        if((amt = cond.read(inbuf)) > 0)
         {
-            return new Message(inbuf[0 .. amt].dup, RECEIVE, amt);
+            return Message(inbuf[0 .. amt].dup.ptr, Receive, amt);
         }
         else
         {
             if (amt == 0)
             {
-                children.remove(h);
-                (cast(AsyncSocketConduit) h.transport).shutdown();
-                return Message(null, REMOTE_CLOSED, amt);
+                cond.shutdown();
+                return Message(null, RemoteClosed, amt);
             }
             log.error("Received no data, err = {}", amt);
         }
-        return new Message(null, ERROR, amt);
+        return Message(null, Error, amt);
     }
     
     /**************************************************************************
@@ -81,7 +90,7 @@
         if there is no more data left to send. 
 
     ***************************************************************************/
-    Message handleWrite(Conduit c)
+    Message handleWrite()
     {
         Logger log = Log.lookup("Handlers.onSend");
      
@@ -95,42 +104,45 @@
                 {
                     //h.remEvent(Event.Write);
                     //TODO - How do we handle event re-registering
-                    return new Message(null, SEND_COMPLETE, sent);
+                    return Message(null, SendComplete, sent);
                 }
             }
             else if (sent == 0)
             {
                 log.error("Select said socket was writable, but sent 0 bytes");
-                return new Message(null, SEND_ERROR, 0);
+                return Message(null, Error, 0);
             }
             else
             {
                 log.error("Socket send return ERR");
-                return new Message(null, SEND_ERROR, sent);
+                return Message(null, Error, sent);
             }
         }
         else
         {
-            //h.remEvent(Event.Write);
-            //TODO - How do we handle event re-registering
-        
-            return new Message(null, SEND_COMPLETE, 0);
+            remEvent(Event.Write);
+            if (!regFn(events))
+            {
+                log.error("unable to register mgr");
+            }
+            return Message(null, SendComplete, 0);
         }
     }
 
-    Message handleDisconnect(Conduit c)
+    Message handleDisconnect()
     {
-        return new Message(c, REMOTE_CLOSED, 0);
+        return Message(cast(void*)cond, RemoteClosed, 0);
     }
 
-    Message handleError(Conduit c)
+    Message handleError()
     {
-        return new Messsage(null, ERROR, 0);
+        return Message(cast(void*)cond, Error, 0);
     } 
 
-    Message handleConnect(Conduit c)
+    Message handleConnect()
     {
-        return new Message(accept(), NEW_CONNECTION, 0);
+        log.trace("accepting new connection");
+        return Message(cast(void*)accept(), NewConnection, 0);
     }
 
     Conduit getConduit()
@@ -138,7 +150,7 @@
         return cond;
     }
 
-    int getEvents()
+    Event getEvents()
     {
         return events;
     }
@@ -152,22 +164,9 @@
     {
         AsyncSocketConduit newcond = new AsyncSocketConduit;
         cond.socket().accept(newcond.socket);
-        log.info("accepted new connection");
+        log.info("accepted new connection {} {}", cast(uint) newcond, newcond.fileHandle());
         return newcond;
     }
-
-    int broadcast(char[] outbuf, TCPProvider[] recips)
-    {
-        foreach(TCPProvider c; recips)
-        {
-            if (c.appendOutBuffer(outbuf))
-            {
-                h.addEvent(Event.Write);
-                vat.addConnection(h);
-            }
-        }
-        return 0;
-    }
     
     /**************************************************************************
     
@@ -177,18 +176,16 @@
         data as the socket becomes free. 
 
     **************************************************************************/
-    int send(char[] outbufl)
+    void send(char[] outbuf)
     {
         if (appendOutBuffer(outbuf))
         {
-            //TODO - should we always register for all events? or update it when needed?
-            //d.addEvent(Event.Write);
-            if (!vat.addConnection(d))
+            addEvent(Event.Write);
+            if (!regFn(events))
             {
                 log.error("unable to register mgr");
             }
         }
-        return 0;
     }
 
  
@@ -198,13 +195,6 @@
         cond.detach(); 
     }
     
-
-    ~this()
-    {
-        (cast(AsyncSocketConduit)manager.transport).shutdown();
-        (cast(AsyncSocketConduit)manager.transport).detach();
-    }
-
     int connect(IPv4Address addr)
     {
         cond = new AsyncSocketConduit;
@@ -218,35 +208,94 @@
 
     /**************************************************************************
     
-        send
-        User-called function to send data to the counterpart at the other
-        end of the connection. This sets up the connection manager to send
-        data as the socket becomes free. 
+        appendOutBuffer
+
+        Adds an outgoing buffer to the list. This returns true if the list
+        was empty, indicating that the handler should be registered with the
+        SelectLoop. If it returns false, it was probably already registered.
+        
+    **************************************************************************/
+    bool appendOutBuffer(char[] outbuf)
+    {
+        out_buffers.append(outbuf);
+        out_buffers_len++;
+        if (out_buffers_len == 1)
+            return true;
+        else
+            return false;
+    }
+
+    /**************************************************************************
+
+        addOffset 
+        Use this function to update the offset position after a successful data
+        send. This not only manages the current offset, but will update the 
+        out buffer chain if necessary. 
+
+        Returns: false if there is nothing left to send, true if there is.
+
+    **************************************************************************/ 
+    bool addOffset(int off)
+    in
+    {
+        assert(out_buffers_len > 0);
+    }
+    body
+    {
+        char[] hd = out_buffers.head();
+        if ((off + o_offset) >= hd.length)
+        {
+            out_buffers.removeHead();
+            o_offset = 0;
+            out_buffers_len--;
+            return (out_buffers_len > 0);
+        }
+        else
+            o_offset += off;
+        return true;
+    }
+
+    /**************************************************************************
+
+        char[] nextBuffer
+
+        Returns a slice of the current outbound buffer, returns a char[] pointing
+        to null if there is no current outbound buffer
 
     **************************************************************************/
-    int send(char[] outbuf, IPv4Address addr = null)
+    char[] nextBuffer()
     {
-        if (!connected)
+        if (out_buffers_len < 1)
         {
-            log.info("send: not connected, connecting");
-            return -1;
+            return null; 
         }
-        if (appendOutBuffer(outbuf))
-        {
-            addEvent(Event.Write);
-            if (!vat.addConnection(manager))
-            {
-                log.error("unable to register mgr");
-            }
-        }
-        return 0;
+
+        return out_buffers.head()[o_offset .. $];
     }
     
-    
+    void setRegisterFunc( bool delegate (Event) fn)
+    {
+        regFn = fn;
+    }
+
+    void addEvent(Event evt)
+    {
+        events |= evt;
+    }
+
+    void remEvent(Event evt)
+    {
+        events &= !evt;
+    }
+
 private
-    Vat vat;
-    Conduit cond;
+    AsyncSocketConduit cond;
     Logger log;
     bool listener;
     Event events;
+    bool connected;
+    CircularList!(char[]) out_buffers;
+    int out_buffers_len;
+    int o_offset;
+    bool delegate (Event) regFn;
 }