Mercurial > projects > dreactor
view dreactor/core/Vat.d @ 13:8c9b1276f623 default tip
bug fixes
author | rick@minifunk |
---|---|
date | Sat, 20 Sep 2008 18:33:11 -0400 |
parents | d6a3cfe7c3de |
children |
line wrap: on
line source
/******************************************************************************* copyright: Copyright (c) 2008 Rick Richardson. All rights reserved license: BSD style: $(LICENSE) version: Initial release v0.1 : May 2008 author: Rick Richardson *******************************************************************************/ module dreactor.core.Vat; import tango.io.selector.Selector; import tango.io.selector.model.ISelector; import tango.core.Exception; import tango.core.Thread; import tango.core.Atomic; import tango.util.log.Log; import dreactor.transport.AsyncSocketConduit; import dreactor.protocol.IProvider; import dreactor.protocol.DefaultProvider; import dreactor.core.Task; import dreactor.util.ThreadSafeQueue; static char[] version_string = "Vat.d 0.1 2008-05-31"; alias bool delegate (Event) RegDg; class TaskAttachment { public Task task; IProvider provider; this (Task ta, IProvider p) { task = ta; provider = p; } } class Vat { static Vat LocalVat; private Thread thread; bool running; Logger log; TaskAttachment[int] tasks; //registry for local tasks Selector selector; static Atomic!(int) taskCount; static TaskAttachment[int] globalTasks; //global registry of tasks public this() { log = Log.lookup("dreactor.core.Vat"); running = true; thread = new Thread(&eventLoop); thread.start(); } static this() { LocalVat = new Vat; } int addTask(Task t) { t.setVat(this); int taskid = taskCount.load() + 1; taskCount.store(taskid); auto p = t.getProvider(); if (p is null) p = new DefaultProvider; p.setRegisterFunc(createRegFunc(taskid)); //default the task id as a param in the delegate auto ta = new TaskAttachment(t, p); tasks[taskid] = ta; globalTasks[taskid] = ta; selector.register(p.getConduit(), p.getEvents(), ta); t.setId(taskid); return taskid; } void exit() { running = false; } void wait() { thread.join(); } bool register(int tid, Event evts) { log.trace("adding handler"); TaskAttachment* ta; if ((ta = (tid in tasks)) !is null) { selector.register((*ta).provider.getConduit(), evts, *ta); return true; } else { return false; } } RegDg createRegFunc(int taskid) { class Functor { int taskid; this (int tid) { taskid = tid; } bool call(Event evts) { return register(taskid, evts); } } auto ftor = new Functor(taskid); return &ftor.call; } bool remConnection(Conduit c) { selector.unregister(c); return true; } Task getTask(int tid) { TaskAttachment* ta; if ((ta = (tid in tasks)) !is null) return ta.task; else return null; } static Task getGlobalTask(int tid) { TaskAttachment* ta; if ((ta = (tid in globalTasks)) !is null) return ta.task; else return null; } private void eventLoop() { selector = new Selector(); selector.open(); do { execTasks(); auto eventCount = selector.select(0.01); if (eventCount > 0) { // process events foreach (SelectionKey key; selector.selectedSet()) { if (key.isReadable()) { // incoming data log.trace("Read event fired"); auto ta = cast(TaskAttachment) key.attachment; ta.task.appendMessage(ta.provider.handleRead()); } else if (key.isWritable()) { log.trace("Write event fired"); auto ta = cast(TaskAttachment) key.attachment; ta.task.appendMessage(ta.provider.handleWrite()); } else if (key.isHangup()) { log.trace("Hangup event fired"); auto ta = cast(TaskAttachment) key.attachment; ta.task.appendMessage(ta.provider.handleDisconnect()); } else if (key.isError() || key.isInvalidHandle()) { log.trace("Error event fired"); // error, close connection auto ta = cast(TaskAttachment) key.attachment; ta.task.appendMessage(ta.provider.handleError()); } } } else if (eventCount == 0) { /* can't think of anything useful to do here. */ } else { log.error("Selector.select returned {}", eventCount); } } while (running) } void execTasks() { foreach(int k; tasks.keys) { if (tasks[k].task.state() == Fiber.State.HOLD) tasks[k].task.call(); if (tasks[k].task.state() == Fiber.State.TERM) tasks.remove(k); } } }