Mercurial > projects > dreactor
view dreactor/core/Vat.d @ 12:d6a3cfe7c3de
more stuff
author | rick@Macintosh.local |
---|---|
date | Wed, 27 Aug 2008 00:47:33 -0400 |
parents | 5836613d16ac |
children | 8c9b1276f623 |
line wrap: on
line source
/******************************************************************************* copyright: Copyright (c) 2008 Rick Richardson. All rights reserved license: BSD style: $(LICENSE) version: Initial release v0.1 : May 2008 author: Rick Richardson *******************************************************************************/ module dreactor.core.Vat; import tango.io.selector.Selector; import tango.io.selector.model.ISelector; import tango.core.Exception; import tango.core.Thread; import tango.core.Atomic; import tango.util.collection.CircularSeq; import tango.util.log.Log; import dreactor.transport.AsyncSocketConduit; import dreactor.protocol.IProvider; import dreactor.core.Task; import dreactor.util.ThreadSafeQueue; static char[] version_string = "Vat.d 0.1 2008-05-31"; enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; class TaskAttachment { public Task task; IProvider provider; this (Task ta, IProvider p) { task = ta; provider = p; } } class Vat { private Thread thread; bool running; Logger log; TaskAttachment[int] tasks; //registry for local tasks Selector selector; static Atomic!(int) taskCount; TaskAttachment[int] globalTasks; //global registry of tasks public this() { log = Log.lookup("dreactor.core.Vat"); running = true; thread = new Thread(&eventLoop); thread.start(); } synchronized int addTask(Task t, IProvider p = null) { t.setVat(this); ++taskCount; auto ta = new TaskAttachment(t, (p !is null) ? p : new DefaultProvider); tasks[taskCount] = ta; globalTask[taskCount] = ta; selector.register(p.getConduit(), p.getEvents(), ta); t.setId(taskCount); return taskCount; } void exit() { running = false; } void wait() { thread.join(); } bool addConnection(int tid, Conduit c, Events evts) { log.trace("adding handler"); TaskAttachment ta; if (ta = (tid in tasks)) return selector.register(c, evts, ta); else return false; } bool remConnection(Conduit c) { return selector.unregister(c); } Task getTask(int tid) { TaskAttachment ta; if (ta = (tid in tasks)) return ta.task; else return null; } static synchronized Task getGlobalTask(int tid) { TaskAttachment ta; if (ta = (tid in globaltasks)) return ta.task; else return null; } private void eventLoop() { selector = new Selector(); selector.open(); do { execTasks(); auto eventCount = selector.select(0.01); if (eventCount > 0) { // process events foreach (SelectionKey key; selector.selectedSet()) { if (key.isReadable()) { // incoming data log.trace("Read event fired"); auto ta = cast(TaskAttachment) key.attachment; processReturn(ta.appendMessage(ta.provider.handleRead()), key.conduit); } else if (key.isWritable()) { log.trace("Write event fired"); auto ta = cast(TaskAttachment) key.attachment; ta.appendMessage(ta.provider.handleWrite()); processReturn(ta.appendMessage(ta.provider.handleWrite()), key.conduit); } else if (key.isHangup()) { log.trace("Hangup event fired"); auto ta = cast(TaskAttachment) key.attachment; ta.appendMessage(ta.provider.handleDisconnect()); processReturn(ta.appendMessage(ta.provider.handleDisconnect()), key.conduit); } else if (key.isError() || key.isInvalidHandle()) { log.trace("Error event fired"); // error, close connection auto ta = cast(TaskAttachment) key.attachment; ta.appendMessage(ta.provider.handleError()); processReturn(ta.appendMessage(ta.provider.handleError()), key.conduit); } } } else if (eventCount == 0) { /* can't think of anything useful to do here. */ } else { log.error("Selector.select returned {}", eventCount); } } while (running) } void execTasks() { foreach(int k; tasks.keys) { if (tasks[k].state() == Fiber.State.HOLD) tasks[k].call(); if (tasks[k].state() == Fiber.State.TERM) tasks.remove(k); } } void processReturn(int result, Conduit c) { switch(result) { case CLOSE: selector.unregister(c); c.detach(); break; case UNREGISTER: selector.unregister(c); break; case REMAIN: //this space intentially left blank break; case REGISTER: break; case REREGISTER: break; default: log.error("processReturn: unknown return value"); } } }