diff asyncdreactor/protocol/RawTcp.d @ 11:5836613d16ac

reorg! reorg!
author rick@minifunk
date Tue, 12 Aug 2008 16:59:56 -0400
parents dreactor/protocol/RawTcp.d@e75a2e506b1d
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/asyncdreactor/protocol/RawTcp.d	Tue Aug 12 16:59:56 2008 -0400
@@ -0,0 +1,345 @@
+module dreactor.protocol.RawTcp;
+
+import tango.io.Conduit;
+import tango.io.selector.model.ISelector;
+import tango.net.Socket;
+import tango.util.collection.CircularSeq;
+import tango.util.log.Log;
+import tango.util.log.Config;
+
+import dreactor.transport.AsyncSocketConduit;
+import dreactor.core.Vat;
+import dreactor.core.Dispatcher;
+
+/******************************************************************************
+    
+    Basic TCP server or client routines for sending raw data.
+
+******************************************************************************/
+class RawTCPListener
+{
+public
+    this(Dispatcher mgr, Vat sel, IPv4Address addr)
+    {
+        manager = mgr;
+        mgr.events(Event.Read);
+        mgr.setOutgoingHandler(&RawTCPHandler.onSend);
+        mgr.setIncomingHandler(&onReceive);
+        mgr.setConnectHandler(&accept);
+        mgr.setErrorHandler(&RawTCPHandler.onError);
+        mgr.setDisconnectHandler(&RawTCPHandler.onHangup);
+        mgr.listen(addr);
+         
+        sel.addConnection(mgr);
+        vat = sel;
+        log = Log.lookup("dreactor.protocol.RawTcpServer");
+        log.info("log initialized");
+        children = new CircularSeq!(Dispatcher);
+    }
+    this(Vat sel, IPv4Address addr)
+    {
+        AsyncSocketConduit cond = new AsyncSocketConduit;
+        cond.socket().setAddressReuse(true);
+        Dispatcher lh = new Dispatcher(cond, true);
+        this(lh, sel, addr);
+    }
+
+    ~this()
+    {
+        close();
+    } 
+
+    int accept(Conduit cond, RegisterD reg)
+    {
+        AsyncSocketConduit newcond = new AsyncSocketConduit;
+        (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket);
+        Dispatcher h = Dispatcher.New(newcond, manager);
+        h.events(Event.Read);
+        vat.addConnection(h);
+        children.append(h);
+        log.info("accepted new connection");
+        return 0;
+    }
+
+    int broadcast(char[] outbuf, Dispatcher[] excluded = null)
+    {
+        foreach(Dispatcher h; children)
+        {
+            if (excluded && excluded.includes(h))
+                continue;
+            if (h.appendOutBuffer(outbuf))
+            {
+                h.addEvent(Event.Write);
+                vat.addConnection(h);
+            }
+        }
+        return 0;
+    }
+    
+    /**************************************************************************
+    
+        send
+        User-called function to send data to the counterpart at the other
+        end of the connection. This sets up the connection manager to send
+        data as the socket becomes free. 
+
+    **************************************************************************/
+    int send(Dispatcher d, char[] outbuf, IPv4Address addr = null)
+    {
+        if (d.appendOutBuffer(outbuf))
+        {
+            d.addEvent(Event.Write);
+            if (!vat.addConnection(d))
+            {
+                log.error("unable to register mgr");
+            }
+        }
+        return 0;
+    }
+
+    /**************************************************************************
+
+        receive
+        IncomingHandlerD
+        Default incoming data handler. Should be replaced with something useful.
+
+    **************************************************************************/ 
+    int onReceive(Dispatcher h)
+    {
+        Logger log = Log.lookup("Handlers.onReceive");
+
+        char inbuf[8192];
+        int amt;
+        if((amt = h.transport.read(inbuf)) > 0)
+        {
+            if (dataHandler)
+                dataHandler(inbuf[0 .. amt], h);
+            else
+                log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
+        }
+        else
+        {
+            if (amt == 0)
+            {
+                children.remove(h);
+                (cast(AsyncSocketConduit) h.transport).shutdown();
+                return CLOSE;
+            }
+            log.error("Received no data, err = {}", amt);
+        }
+        return REMAIN;
+    }
+ 
+    void close()
+    {
+        foreach(Dispatcher d; children)
+        {
+            (cast(AsyncSocketConduit)d.transport).shutdown();
+            (cast(AsyncSocketConduit)d.transport).detach();
+        }
+        (cast(AsyncSocketConduit)manager.transport).shutdown();
+        (cast(AsyncSocketConduit)manager.transport).detach(); 
+        
+    }
+    
+    void setDataHandler(void delegate(char[], Dispatcher) h)
+    {
+        dataHandler = h;
+    }
+
+private
+    Vat vat;
+    CircularSeq!(Dispatcher) children;
+    Dispatcher manager;
+    Logger log;
+    RawTCPHandler h;
+    void delegate(char[], Dispatcher) dataHandler;
+}
+
+class RawTCPClient
+{
+
+public
+    this(Dispatcher mgr, Vat sel, Event evts = Event.Read)
+    {
+        manager = mgr;
+        manager.events(evts);
+        connected = false;
+        mgr.setOutgoingHandler(&RawTCPHandler.onSend);
+        mgr.setIncomingHandler(&onReceive);
+        mgr.setErrorHandler(&RawTCPHandler.onError);
+        mgr.setDisconnectHandler(&RawTCPHandler.onHangup);
+        vat = sel;
+        log = Log.lookup("dreactor.protocol.RawTcpClient");
+    }
+
+    this(Vat sel, Event evts = Event.Read)
+    {
+        AsyncSocketConduit clcond = new AsyncSocketConduit;
+        Dispatcher ch = new Dispatcher(clcond);
+        this(ch, sel, evts);
+    }
+
+    ~this()
+    {
+        (cast(AsyncSocketConduit)manager.transport).shutdown();
+        (cast(AsyncSocketConduit)manager.transport).detach();
+    }
+
+    int connect(IPv4Address addr)
+    {
+        (cast(AsyncSocketConduit) manager.transport()).connect(addr);
+        vat.addConnection(manager);
+        connected = true;
+        log.info("connected to {}", addr);
+        return 0;
+    } 
+
+    /**************************************************************************
+    
+        send
+        User-called function to send data to the counterpart at the other
+        end of the connection. This sets up the connection manager to send
+        data as the socket becomes free. 
+
+    **************************************************************************/
+    int send(char[] outbuf, IPv4Address addr = null)
+    {
+        if (!connected)
+        {
+            log.info("send: not connected, connecting");
+            if (addr !is null)
+            {
+                if (0 > connect(addr))
+                {
+                    log.error("send: unable to connect");
+                    return -1;
+                }
+            }
+        }
+        if (manager.appendOutBuffer(outbuf))
+        {
+            manager.addEvent(Event.Write);
+            if (!vat.addConnection(manager))
+            {
+                log.error("unable to register mgr");
+            }
+        }
+        return 0;
+    }
+    
+    /**************************************************************************
+
+        receive
+        IncomingHandlerD
+        Default incoming data handler. Should be replaced with something useful.
+
+    **************************************************************************/ 
+    public int onReceive(Dispatcher h)
+    {
+        Logger log = Log.lookup("Handlers.onReceive");
+
+        char inbuf[8192];
+        int amt;
+        if((amt = h.transport.read(inbuf)) > 0)
+        {
+            if (dataHandler)
+                dataHandler(inbuf[0 .. amt], h);
+            else
+                log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
+        }
+        else
+        {
+            if (amt == 0)
+            {
+                return CLOSE;
+            }
+            log.error("Received no data, err = {}", amt);
+        }
+        return REMAIN;
+    }
+    
+    void setDataHandler(void delegate(char[], Dispatcher) h)
+    {
+        dataHandler = h;
+    }
+private
+    void delegate(char[], Dispatcher) dataHandler;
+    Dispatcher manager;
+    Vat vat;
+    bool connected;
+    Logger log;
+    RawTCPHandler h;
+}
+
+
+/******************************************************************************
+
+    Default Event handlers common to both listener/clients
+
+******************************************************************************/
+struct RawTCPHandler
+{
+    /**************************************************************************
+
+        onSend
+        OutgoingHandlerD 
+        To be registered as the response to socket writable event. 
+        Sends data, returns amount sent. Unregisters Handler for sending
+        if there is no more data left to send. 
+
+    ***************************************************************************/
+    public static int onSend(Dispatcher h)
+    {
+        Logger log = Log.lookup("Handlers.onSend");
+     
+        char[] outbuf = h.nextBuffer();
+        if (outbuf !is null)
+        {
+            int sent = h.transport.write(outbuf);
+            if (sent > 0)
+            {
+                if (! h.addOffset(sent))
+                {
+                    h.remEvent(Event.Write);
+                    return REREGISTER;
+                }
+            }
+            else if (sent == AsyncSocketConduit.Eof)
+            {
+                log.error("Select said socket was writable, but sent 0 bytes");
+            }
+            else
+            {
+               log.error("Socket send return ERR");
+            }
+            return REMAIN;
+        }
+        else
+        {
+            h.remEvent(Event.Write);
+            return REREGISTER;
+        }
+    }
+
+    static int onHangup(Dispatcher d)
+    {
+        return UNREGISTER;
+    }
+
+    static int onError(Dispatcher d, RegisterD unreg)
+    {
+        return CLOSE;
+    } 
+
+}
+
+bool includes(Dispatcher[] haystack, Dispatcher needle)
+{
+    foreach(Dispatcher h; haystack)
+    {
+        if (h is needle)
+            return true;
+    }
+    return false;
+}