view dreactor/core/Vat.d @ 13:8c9b1276f623 default tip

bug fixes
author rick@minifunk
date Sat, 20 Sep 2008 18:33:11 -0400
parents d6a3cfe7c3de
children
line wrap: on
line source

/*******************************************************************************

        copyright:      Copyright (c) 2008 Rick Richardson. All rights reserved

        license:        BSD style: $(LICENSE)

        version:        Initial release v0.1 : May 2008
        
        author:         Rick Richardson

*******************************************************************************/

module dreactor.core.Vat;

import tango.io.selector.Selector;
import tango.io.selector.model.ISelector;
import tango.core.Exception;
import tango.core.Thread;
import tango.core.Atomic;
import tango.util.log.Log;

import dreactor.transport.AsyncSocketConduit;
import dreactor.protocol.IProvider;
import dreactor.protocol.DefaultProvider;
import dreactor.core.Task;
import dreactor.util.ThreadSafeQueue;

static char[] version_string = "Vat.d 0.1 2008-05-31";

alias bool delegate (Event) RegDg;

class TaskAttachment
{
public
    Task task;
    IProvider provider;

    this (Task ta, IProvider p) 
    { task = ta; provider = p; }
}

class Vat
{
    static Vat LocalVat;

private
    Thread thread;
    bool running;
    Logger log;
 
    TaskAttachment[int] tasks; //registry for local tasks

    Selector selector;
    static Atomic!(int) taskCount;
    static TaskAttachment[int] globalTasks; //global registry of tasks

public 

    this()
    {
        log = Log.lookup("dreactor.core.Vat");
        
        running = true;
        thread = new Thread(&eventLoop);
        thread.start();
    }

    static this()
    {
        LocalVat = new Vat;
    }

    int addTask(Task t)
    {
        t.setVat(this);
        int taskid = taskCount.load() + 1;
        taskCount.store(taskid);
        auto p = t.getProvider();
        if (p is null)
            p = new DefaultProvider;
        p.setRegisterFunc(createRegFunc(taskid));  //default the task id as a param in the delegate
        auto ta = new TaskAttachment(t, p);
        tasks[taskid] = ta;
        globalTasks[taskid] = ta;
        selector.register(p.getConduit(), p.getEvents(), ta);
        t.setId(taskid);

        return taskid;
    }
 
    void exit()
    {
        running = false;
    }

    void wait()
    {
        thread.join();
    }

    bool register(int tid, Event evts)
    {
        log.trace("adding handler");
        TaskAttachment* ta;
        if ((ta = (tid in tasks)) !is null)
        {
            selector.register((*ta).provider.getConduit(), evts, *ta);
            return true;
        }
        else
        {
            return false;
        }
    }
    
    RegDg createRegFunc(int taskid)
    {
        class Functor
        {   
            int taskid;
            this (int tid)
            {
                taskid = tid;
            }
            bool call(Event evts)
            {
                return register(taskid, evts);    
            }
        }
        auto ftor = new Functor(taskid);
        return &ftor.call;
    }   

    bool remConnection(Conduit c)
    {
        selector.unregister(c);
        return true;
    }

    Task getTask(int tid)
    {
        TaskAttachment* ta;
        if ((ta = (tid in tasks)) !is null)
            return ta.task;
        else
            return null;
    }

    static Task getGlobalTask(int tid)
    {
        TaskAttachment* ta;
        if ((ta = (tid in globalTasks)) !is null)
            return ta.task;
        else
            return null;
    }

private
    void eventLoop()
    {
        selector = new Selector();
        selector.open();
        do
        {
            execTasks();
            auto eventCount = selector.select(0.01);

            if (eventCount > 0)
            {
                // process events
                foreach (SelectionKey key; selector.selectedSet())
                {
                    if (key.isReadable())
                    {
                        // incoming data
                        log.trace("Read event fired");    
                        auto ta = cast(TaskAttachment) key.attachment;
                        ta.task.appendMessage(ta.provider.handleRead());

                    }
                    else if (key.isWritable())
                    {
                        log.trace("Write event fired");    
                        auto ta = cast(TaskAttachment) key.attachment;
                        ta.task.appendMessage(ta.provider.handleWrite());
                    }
                    else if (key.isHangup())
                    {
                        log.trace("Hangup event fired");
                        auto ta = cast(TaskAttachment) key.attachment;
                        ta.task.appendMessage(ta.provider.handleDisconnect());
                    }
                    else if (key.isError() || key.isInvalidHandle())
                    {
                        log.trace("Error event fired"); 
                        // error, close connection
                        auto ta = cast(TaskAttachment) key.attachment;
                        ta.task.appendMessage(ta.provider.handleError());
                    }
                }
            }
            else if (eventCount == 0)
            {
                /* can't think of anything useful to do here. */
            }
            else
            {
                log.error("Selector.select returned {}", eventCount);
            }

        } while (running)

    }

    void execTasks()
    {
        foreach(int k; tasks.keys)
        {
            if (tasks[k].task.state() == Fiber.State.HOLD)
                tasks[k].task.call();
            if (tasks[k].task.state() == Fiber.State.TERM)
                tasks.remove(k);
        }        
    }

}