Mercurial > projects > dreactor
view dreactor/core/Vat.d @ 11:5836613d16ac
reorg! reorg!
author | rick@minifunk |
---|---|
date | Tue, 12 Aug 2008 16:59:56 -0400 |
parents | |
children | d6a3cfe7c3de |
line wrap: on
line source
/******************************************************************************* copyright: Copyright (c) 2008 Rick Richardson. All rights reserved license: BSD style: $(LICENSE) version: Initial release v0.1 : May 2008 author: Rick Richardson *******************************************************************************/ module dreactor.core.Vat; import tango.io.selector.Selector; import tango.io.selector.model.ISelector; import tango.core.Exception; import tango.core.Thread; import tango.core.Atomic; import tango.util.collection.CircularSeq; import tango.util.log.Log; import dreactor.transport.AsyncSocketConduit; import dreactor.core.Task; import dreactor.util.ThreadSafeQueue; static char[] version_string = "Vat.d 0.1 2008-05-31"; Logger log; enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; alias Message delegate (Conduit c) HandlerDG; alias Message function (Conduit c) HandlerFN; class TaskAttachment { public Task task; HandlerDG dg; HandlerFN fn; this(Task ta, HandlerDG d) { TaskAttachment t; t.task = ta; t.dg = d; return t; } this(Task ta, HandlerFN f) { TaskAttachment t; t.task = ta; t.fn = f; return t; } public Message opCall(Conduit c) { dg is null ? return fn() : return dg(c); } } class Vat { private Thread thread; bool running; Task[int] tasks; int taskCount; public this(Task t) { addTask(t); this(); } this() { log = Log.lookup("dreactor.core.Vat"); running = true; thread = new Thread(&eventLoop); thread.start(); } void addTask(Task t) { t.setVat(this); ++taskCount; tasks[taskCount] = t; t.setId(taskCount); } void exit() { running = false; } void wait() { thread.join(); } bool addConnection() { log.trace("adding handler"); return selector.register(h.transport, h.events(), h); } bool remConnection(Dispatcher handler) { return selector.unregister(h.transport); } private void eventLoop() { auto selector = new Selector(); selector.open(); do { execTasks(); auto eventCount = selector.select(0.01); if (eventCount > 0) { // process events foreach (SelectionKey key; selector.selectedSet()) { if (key.isReadable()) { // incoming data log.trace("Read event fired"); auto conn = cast(Dispatcher) key.attachment; if ( Dispatcher.State.listening == conn.getState() ) conn.handleConnection(conn.transport, &addConnection); else processReturn(conn.handleIncoming(), selector, conn); } else if (key.isWritable()) { log.trace("Write event fired"); auto conn = cast(Dispatcher) key.attachment; processReturn(conn.handleOutgoing(), selector, conn); } else if (key.isHangup()) { log.trace("Hangup event fired"); auto conn = cast(Dispatcher) key.attachment; processReturn(conn.handleDisconnect(), selector, conn); } else if (key.isError() || key.isInvalidHandle()) { log.trace("Error event fired"); // error, close connection auto conn = cast(Dispatcher) key.attachment; conn.handleError(&remConnection); } } } else if (eventCount == 0) { /* can't think of anything useful to do here. */ } else { log.error("Selector.select returned {}", eventCount); } } while (running) } void execTasks() { foreach(int k; tasks.keys) { if (tasks[k].state() == Fiber.State.HOLD) tasks[k].call(); if (tasks[k].state() == Fiber.State.TERM) tasks.remove(k); } } void processReturn(int result, Selector s, Dispatcher h) { switch(result) { case CLOSE: s.unregister(h.transport); h.transport.detach(); break; case UNREGISTER: s.unregister(h.transport); break; case REMAIN: //this space intentially left blank break; case REGISTER: s.register(h.transport, h.events(), h); break; case REREGISTER: s.register(h.transport, h.events(), h); break; default: log.error("processReturn: unknown return value"); } } }