view dreactor/core/Vat.d @ 12:d6a3cfe7c3de

more stuff
author rick@Macintosh.local
date Wed, 27 Aug 2008 00:47:33 -0400
parents 5836613d16ac
children 8c9b1276f623
line wrap: on
line source

/*******************************************************************************

        copyright:      Copyright (c) 2008 Rick Richardson. All rights reserved

        license:        BSD style: $(LICENSE)

        version:        Initial release v0.1 : May 2008
        
        author:         Rick Richardson

*******************************************************************************/

module dreactor.core.Vat;

import tango.io.selector.Selector;
import tango.io.selector.model.ISelector;
import tango.core.Exception;
import tango.core.Thread;
import tango.core.Atomic;
import tango.util.collection.CircularSeq;
import tango.util.log.Log;

import dreactor.transport.AsyncSocketConduit;
import dreactor.protocol.IProvider;
import dreactor.core.Task;
import dreactor.util.ThreadSafeQueue;

static char[] version_string = "Vat.d 0.1 2008-05-31";


enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};

class TaskAttachment
{
public
    Task task;
    IProvider provider;

    this (Task ta, IProvider p) 
    { task = ta; provider = p; }
}

class Vat
{
private
    Thread thread;
    bool running;
    Logger log;
 
    TaskAttachment[int] tasks; //registry for local tasks

    Selector selector;
    static Atomic!(int) taskCount;
    TaskAttachment[int] globalTasks; //global registry of tasks

public 

    this()
    {
        log = Log.lookup("dreactor.core.Vat");
        
        running = true;
        thread = new Thread(&eventLoop);
        thread.start();
    }
     
    synchronized int addTask(Task t, IProvider p = null)
    {
        t.setVat(this);
        ++taskCount;
        auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider);
        tasks[taskCount] = ta;
        globalTask[taskCount] = ta;
        selector.register(p.getConduit(), p.getEvents(), ta);
        t.setId(taskCount);
        return taskCount;
    }
 
    void exit()
    {
        running = false;
    }

    void wait()
    {
        thread.join();
    }

    bool addConnection(int tid, Conduit c, Events evts)
    {
        log.trace("adding handler");
        TaskAttachment ta;
        if (ta = (tid in tasks))
            return selector.register(c, evts, ta);
        else
            return false;
    }
     
    bool remConnection(Conduit c)
    {
        return selector.unregister(c);
    }

    Task getTask(int tid)
    {
        TaskAttachment ta;
        if (ta = (tid in tasks))
            return ta.task;
        else
            return null;
    }

    static synchronized Task getGlobalTask(int tid)
    {
        TaskAttachment ta;
        if (ta = (tid in globaltasks))
            return ta.task;
        else
            return null;
    }

private
    void eventLoop()
    {
        selector = new Selector();
        selector.open();
        do
        {
            execTasks();
            auto eventCount = selector.select(0.01);

            if (eventCount > 0)
            {
                // process events
                foreach (SelectionKey key; selector.selectedSet())
                {
                    if (key.isReadable())
                    {
                        // incoming data
                        log.trace("Read event fired");    
                        auto ta = cast(TaskAttachment) key.attachment;
                        processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit);

                    }
                    else if (key.isWritable())
                    {
                        log.trace("Write event fired");    
                        auto ta = cast(TaskAttachment) key.attachment;
                        ta.appendMessage(ta.provider.handleWrite());
                        processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit);
                    }
                    else if (key.isHangup())
                    {
                        log.trace("Hangup event fired");
                        auto ta = cast(TaskAttachment) key.attachment;
                        ta.appendMessage(ta.provider.handleDisconnect());
                        processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit);
                    }
                    else if (key.isError() || key.isInvalidHandle())
                    {
                        log.trace("Error event fired"); 
                        // error, close connection
                        auto ta = cast(TaskAttachment) key.attachment;
                        ta.appendMessage(ta.provider.handleError());
                        processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit);
                    }
                }
            }
            else if (eventCount == 0)
            {
                /* can't think of anything useful to do here. */
            }
            else
            {
                log.error("Selector.select returned {}", eventCount);
            }

        } while (running)

    }

    void execTasks()
    {
        foreach(int k; tasks.keys)
        {
            if (tasks[k].state() == Fiber.State.HOLD)
                tasks[k].call();
            if (tasks[k].state() == Fiber.State.TERM)
                tasks.remove(k);
        }        
    }

    void processReturn(int result, Conduit c)
    {
        switch(result)
        {
            case CLOSE:
                selector.unregister(c);
                c.detach();
            break;
            case UNREGISTER:
                selector.unregister(c);
            break;
            case REMAIN:
                //this space intentially left blank
            break;
            case REGISTER:
            break;
            case REREGISTER:
            break;
            default:
                log.error("processReturn: unknown return value");
        }
    }
}