view dreactor/core/Task.d @ 13:8c9b1276f623 default tip

bug fixes
author rick@minifunk
date Sat, 20 Sep 2008 18:33:11 -0400
parents d6a3cfe7c3de
children
line wrap: on
line source

module dreactor.core.Task;

import tango.core.Thread;
import tango.util.container.HashMap;
import tango.util.container.CircularList;
import tango.core.Atomic;

import dreactor.core.Vat;
import dreactor.protocol.IProvider;
import dreactor.protocol.DefaultProvider;

alias CircularList!(Message) Messages;

class Mailbox
{
public

    this () { box = new HashMap!(int, Messages); }

    Message popMessageOfType(int type)
    {
        Messages m;
        if (box.get(type, m))
        {
            if (!m.isEmpty)
            {
                Message msg = m.removeHead();
                msg_count.store(msg_count.load()-1);

                if (m.isEmpty())
                    box.removeKey(type);
                return msg;
            }
            else
                box.removeKey(type);
        }
        Message msg;
        return msg;
    }

    //TODO this could be optimized to use set intersection logic instead of checking for 
    //multiple keys one at a time. 
    Message popMessageOfType(int[] types)
    {
        foreach(int i; types)
        {
            Message msg = popMessageOfType(i);
            if (msg.valid)
                return msg;
        }
        Message msg;
        return msg;
    }

    Message popMessage()
    {
        Messages m;
        int key;
        auto itor = box.iterator;

        while (true)
        {
            if (itor.valid && itor.next(key, m))
            {
                if (!m.isEmpty())
                {
                    Message msg = m.removeHead();
                    if (msg.valid)
                        msg_count.store(msg_count.load()-1);
                    if (m.isEmpty())
                        box.removeKey(key);
                    return msg;
                }
                else 
                {
                    itor.remove();
                }
            }
            else
            {
                Message msg; 
                return msg;
            }
        }
    }

    void push(Message msg)
    {
        Messages m;
        if (box.get(msg.type, m))
            m.append(msg);
        else
        {
            m = new Messages;
            m.append(msg);
            box.add(msg.type, m);
        }
        msg_count.store(msg_count.load()+1);
    }

    int count()
    {
        return msg_count.load();
    }
private
    HashMap!(int, Messages) box;
    Atomic!(int) msg_count;
}

alias void delegate (Message) TaskDg;
class Task
{
private
    Fiber fiber;
    Mailbox mailbox;
    Mailbox lockedMailbox;
    int id;
    Vat vat;
    TaskDg taskdg;
    IProvider provider;

public
    this(IProvider prov = null) 
    {
        fiber = new Fiber(&run, 4096 * 4);
        mailbox = new Mailbox;
        lockedMailbox = new Mailbox;
        provider = prov ? prov : new DefaultProvider;
         
        Vat.LocalVat.addTask(this);
    }

    void setId(int i)
    {
        id = i;
    }

    void  appendMessage(Message m) 
    { 
        mailbox.push(m); 
    }

    synchronized void appendIVMessage(Message m)
    {
        lockedMailbox.push(m);
    }

    void setVat(Vat v)
    {
        vat = v;
    }

    IProvider getProvider()
    {
        return provider;
    }

    void run()
    in
    {
        assert(taskdg !is null);
    }
    body
    {
        Message msg;
        while ((msg = receive()).valid)
        {
            taskdg(msg);
        }
    }

    /***************************************************************************
        sendTo
        Basic message passing utility for inter-task communication. 
        It first checks the local Vat to see if the task is present, if not
        it gets the task from the global registry and sends a message to its 
        thread-safe mailbox. 
    ****************************************************************************/
 
    bool sendTo(int taskid, Message m)
    {
        Task t;
        if ((t = vat.getTask(taskid)) !is null)
        {
            t.appendMessage(m);
            return true;
        }
        else if ((t = Vat.getGlobalTask(taskid)) !is null)
        {
            t.appendIVMessage(m);
            return true;
        }
        return false;
    }

    char[] getString(Message msg)
    {
        return (cast(char*) msg.payload)[0 .. msg.info];
    }

    Fiber.State state()
    {
        return fiber.state();
    }
    void call()
    {
        fiber.call();
    }
protected

    /***************************************************************************
        receive
        User-called function to get the next pending message in the mailbox. 
        If there are no pending messages, this will yield control back to 
        the vat's scheduler. 
    ***************************************************************************/

    Message receive(int[] types) 
    {
        while(true)
        {
            Message m = mailbox.popMessageOfType(types);
            if (!m.valid)
                Fiber.yield();
            else 
                return m;
            
        }
    }

    Message receive()
    {
        while(true)
        {
            Message m = mailbox.popMessage();
            if (!m.valid)
                Fiber.yield();
            else 
                return m;
        }
    }

    int getId() { return id;}
    
}