view asyncdreactor/protocol/RawTcp.d @ 11:5836613d16ac

reorg! reorg!
author rick@minifunk
date Tue, 12 Aug 2008 16:59:56 -0400
parents dreactor/protocol/RawTcp.d@e75a2e506b1d
children
line wrap: on
line source

module dreactor.protocol.RawTcp;

import tango.io.Conduit;
import tango.io.selector.model.ISelector;
import tango.net.Socket;
import tango.util.collection.CircularSeq;
import tango.util.log.Log;
import tango.util.log.Config;

import dreactor.transport.AsyncSocketConduit;
import dreactor.core.Vat;
import dreactor.core.Dispatcher;

/******************************************************************************
    
    Basic TCP server or client routines for sending raw data.

******************************************************************************/
class RawTCPListener
{
public
    this(Dispatcher mgr, Vat sel, IPv4Address addr)
    {
        manager = mgr;
        mgr.events(Event.Read);
        mgr.setOutgoingHandler(&RawTCPHandler.onSend);
        mgr.setIncomingHandler(&onReceive);
        mgr.setConnectHandler(&accept);
        mgr.setErrorHandler(&RawTCPHandler.onError);
        mgr.setDisconnectHandler(&RawTCPHandler.onHangup);
        mgr.listen(addr);
         
        sel.addConnection(mgr);
        vat = sel;
        log = Log.lookup("dreactor.protocol.RawTcpServer");
        log.info("log initialized");
        children = new CircularSeq!(Dispatcher);
    }
    this(Vat sel, IPv4Address addr)
    {
        AsyncSocketConduit cond = new AsyncSocketConduit;
        cond.socket().setAddressReuse(true);
        Dispatcher lh = new Dispatcher(cond, true);
        this(lh, sel, addr);
    }

    ~this()
    {
        close();
    } 

    int accept(Conduit cond, RegisterD reg)
    {
        AsyncSocketConduit newcond = new AsyncSocketConduit;
        (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket);
        Dispatcher h = Dispatcher.New(newcond, manager);
        h.events(Event.Read);
        vat.addConnection(h);
        children.append(h);
        log.info("accepted new connection");
        return 0;
    }

    int broadcast(char[] outbuf, Dispatcher[] excluded = null)
    {
        foreach(Dispatcher h; children)
        {
            if (excluded && excluded.includes(h))
                continue;
            if (h.appendOutBuffer(outbuf))
            {
                h.addEvent(Event.Write);
                vat.addConnection(h);
            }
        }
        return 0;
    }
    
    /**************************************************************************
    
        send
        User-called function to send data to the counterpart at the other
        end of the connection. This sets up the connection manager to send
        data as the socket becomes free. 

    **************************************************************************/
    int send(Dispatcher d, char[] outbuf, IPv4Address addr = null)
    {
        if (d.appendOutBuffer(outbuf))
        {
            d.addEvent(Event.Write);
            if (!vat.addConnection(d))
            {
                log.error("unable to register mgr");
            }
        }
        return 0;
    }

    /**************************************************************************

        receive
        IncomingHandlerD
        Default incoming data handler. Should be replaced with something useful.

    **************************************************************************/ 
    int onReceive(Dispatcher h)
    {
        Logger log = Log.lookup("Handlers.onReceive");

        char inbuf[8192];
        int amt;
        if((amt = h.transport.read(inbuf)) > 0)
        {
            if (dataHandler)
                dataHandler(inbuf[0 .. amt], h);
            else
                log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
        }
        else
        {
            if (amt == 0)
            {
                children.remove(h);
                (cast(AsyncSocketConduit) h.transport).shutdown();
                return CLOSE;
            }
            log.error("Received no data, err = {}", amt);
        }
        return REMAIN;
    }
 
    void close()
    {
        foreach(Dispatcher d; children)
        {
            (cast(AsyncSocketConduit)d.transport).shutdown();
            (cast(AsyncSocketConduit)d.transport).detach();
        }
        (cast(AsyncSocketConduit)manager.transport).shutdown();
        (cast(AsyncSocketConduit)manager.transport).detach(); 
        
    }
    
    void setDataHandler(void delegate(char[], Dispatcher) h)
    {
        dataHandler = h;
    }

private
    Vat vat;
    CircularSeq!(Dispatcher) children;
    Dispatcher manager;
    Logger log;
    RawTCPHandler h;
    void delegate(char[], Dispatcher) dataHandler;
}

class RawTCPClient
{

public
    this(Dispatcher mgr, Vat sel, Event evts = Event.Read)
    {
        manager = mgr;
        manager.events(evts);
        connected = false;
        mgr.setOutgoingHandler(&RawTCPHandler.onSend);
        mgr.setIncomingHandler(&onReceive);
        mgr.setErrorHandler(&RawTCPHandler.onError);
        mgr.setDisconnectHandler(&RawTCPHandler.onHangup);
        vat = sel;
        log = Log.lookup("dreactor.protocol.RawTcpClient");
    }

    this(Vat sel, Event evts = Event.Read)
    {
        AsyncSocketConduit clcond = new AsyncSocketConduit;
        Dispatcher ch = new Dispatcher(clcond);
        this(ch, sel, evts);
    }

    ~this()
    {
        (cast(AsyncSocketConduit)manager.transport).shutdown();
        (cast(AsyncSocketConduit)manager.transport).detach();
    }

    int connect(IPv4Address addr)
    {
        (cast(AsyncSocketConduit) manager.transport()).connect(addr);
        vat.addConnection(manager);
        connected = true;
        log.info("connected to {}", addr);
        return 0;
    } 

    /**************************************************************************
    
        send
        User-called function to send data to the counterpart at the other
        end of the connection. This sets up the connection manager to send
        data as the socket becomes free. 

    **************************************************************************/
    int send(char[] outbuf, IPv4Address addr = null)
    {
        if (!connected)
        {
            log.info("send: not connected, connecting");
            if (addr !is null)
            {
                if (0 > connect(addr))
                {
                    log.error("send: unable to connect");
                    return -1;
                }
            }
        }
        if (manager.appendOutBuffer(outbuf))
        {
            manager.addEvent(Event.Write);
            if (!vat.addConnection(manager))
            {
                log.error("unable to register mgr");
            }
        }
        return 0;
    }
    
    /**************************************************************************

        receive
        IncomingHandlerD
        Default incoming data handler. Should be replaced with something useful.

    **************************************************************************/ 
    public int onReceive(Dispatcher h)
    {
        Logger log = Log.lookup("Handlers.onReceive");

        char inbuf[8192];
        int amt;
        if((amt = h.transport.read(inbuf)) > 0)
        {
            if (dataHandler)
                dataHandler(inbuf[0 .. amt], h);
            else
                log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
        }
        else
        {
            if (amt == 0)
            {
                return CLOSE;
            }
            log.error("Received no data, err = {}", amt);
        }
        return REMAIN;
    }
    
    void setDataHandler(void delegate(char[], Dispatcher) h)
    {
        dataHandler = h;
    }
private
    void delegate(char[], Dispatcher) dataHandler;
    Dispatcher manager;
    Vat vat;
    bool connected;
    Logger log;
    RawTCPHandler h;
}


/******************************************************************************

    Default Event handlers common to both listener/clients

******************************************************************************/
struct RawTCPHandler
{
    /**************************************************************************

        onSend
        OutgoingHandlerD 
        To be registered as the response to socket writable event. 
        Sends data, returns amount sent. Unregisters Handler for sending
        if there is no more data left to send. 

    ***************************************************************************/
    public static int onSend(Dispatcher h)
    {
        Logger log = Log.lookup("Handlers.onSend");
     
        char[] outbuf = h.nextBuffer();
        if (outbuf !is null)
        {
            int sent = h.transport.write(outbuf);
            if (sent > 0)
            {
                if (! h.addOffset(sent))
                {
                    h.remEvent(Event.Write);
                    return REREGISTER;
                }
            }
            else if (sent == AsyncSocketConduit.Eof)
            {
                log.error("Select said socket was writable, but sent 0 bytes");
            }
            else
            {
               log.error("Socket send return ERR");
            }
            return REMAIN;
        }
        else
        {
            h.remEvent(Event.Write);
            return REREGISTER;
        }
    }

    static int onHangup(Dispatcher d)
    {
        return UNREGISTER;
    }

    static int onError(Dispatcher d, RegisterD unreg)
    {
        return CLOSE;
    } 

}

bool includes(Dispatcher[] haystack, Dispatcher needle)
{
    foreach(Dispatcher h; haystack)
    {
        if (h is needle)
            return true;
    }
    return false;
}