view dreactor/core/Vat.d @ 9:5412a1ff2e49

adding chat client and more updates
author rick@minifunk
date Sat, 12 Jul 2008 10:42:41 -0400
parents 60cf25102fb2
children
line wrap: on
line source

/*******************************************************************************

        copyright:      Copyright (c) 2008 Rick Richardson. All rights reserved

        license:        BSD style: $(LICENSE)

        version:        Initial release v0.1 : May 2008
        
        author:         Rick Richardson

*******************************************************************************/

module dreactor.core.Vat;

import tango.io.selector.Selector;
import tango.io.selector.model.ISelector;
import tango.core.Exception;
import tango.core.Thread;
import tango.core.Atomic;
import tango.util.collection.LinkSeq;
import tango.util.log.Log;

import dreactor.transport.AsyncSocketConduit;
import dreactor.core.Dispatcher;
import dreactor.util.ThreadSafeQueue;

Logger log;

enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};

static char[] version_string = "Vat.d 0.1 2008-05-31";

class Vat
{
private
    Thread thread;
    bool running;
    Atomic!(int) pending;
 
    ThreadSafeQueue!(Dispatcher) freshList; 
    ThreadSafeQueue!(Dispatcher) remList;
public 
    this()
    {
        freshList = new ThreadSafeQueue!(Dispatcher);
        remList = new ThreadSafeQueue!(Dispatcher);
        log = Log.lookup("dreactor.core.Vat");
    }

    void run()
    {
        running = true;
        thread = new Thread(&eventLoop);
        thread.start();
    }

    void exit()
    {
        running = false;
    }

    void wait()
    {
        thread.join();
    }

    bool addConnection(Dispatcher handler)
    {
        log.trace("adding handler");
        return freshList.push(handler);       
    }
     
    bool remConnection(Dispatcher handler)
    {
        return remList.push(handler);
    }

private
    void eventLoop()
    {
        auto selector = new Selector();
        selector.open();
        do
        {
            auto eventCount = selector.select(0.01);

            if (eventCount > 0)
            {
                // process events
                foreach (SelectionKey key; selector.selectedSet())
                {
                    if (key.isReadable())
                    {
                        // incoming data
                        log.trace("Read event fired");    
                        auto conn = cast(Dispatcher) key.attachment;
                        if ( Dispatcher.State.listening == conn.getState() )
                            conn.handleConnection(conn.transport, &addConnection);
                        else
                            processReturn(conn.handleIncoming(), selector, conn);
                    }
                    else if (key.isWritable())
                    {
                        log.trace("Write event fired");    
                        auto conn = cast(Dispatcher) key.attachment;
                        processReturn(conn.handleOutgoing(), selector, conn);
                    }
                    else if (key.isHangup())
                    {
                        log.trace("Hangup event fired");
                        auto conn = cast(Dispatcher) key.attachment;
                        processReturn(conn.handleDisconnect(), selector, conn);
                    }
                    else if (key.isError() || key.isInvalidHandle())
                    {
                        log.trace("Error event fired");    
                        // error, close connection
                        auto conn = cast(Dispatcher) key.attachment;
                        conn.handleError(&remConnection);
                    }
                }
            }
            else if (eventCount == 0)
            {
                /* can't think of anything useful to do here. */
            }
            else
            {
                log.error("Selector.select returned {}", eventCount);
            }
            //add Conduits to listener
            freshList.processAll( (ref Dispatcher h)
            {
                selector.register(h.transport, h.events(), h);
                return 1; 
            });
            remList.processAll( (ref Dispatcher h)
            {
                selector.unregister(h.transport);
                return 1;
            });

        } while (running)

    }

    void processReturn(int result, Selector s, Dispatcher h)
    {
        switch(result)
        {
            case CLOSE:
                s.unregister(h.transport);
                h.transport.detach();
            break;
            case UNREGISTER:
                s.unregister(h.transport);
            break;
            case REMAIN:
                //this space intentially left blank
            break;
            case REGISTER:
                s.register(h.transport, h.events(), h);
            break;
            case REREGISTER:
                s.register(h.transport, h.events(), h);
            break;
            default:
                log.error("processReturn: unknown return value");
        }
    }
}