Mercurial > projects > dreactor
diff dreactor/core/Vat.d @ 11:5836613d16ac
reorg! reorg!
author | rick@minifunk |
---|---|
date | Tue, 12 Aug 2008 16:59:56 -0400 |
parents | |
children | d6a3cfe7c3de |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/dreactor/core/Vat.d Tue Aug 12 16:59:56 2008 -0400 @@ -0,0 +1,200 @@ +/******************************************************************************* + + copyright: Copyright (c) 2008 Rick Richardson. All rights reserved + + license: BSD style: $(LICENSE) + + version: Initial release v0.1 : May 2008 + + author: Rick Richardson + +*******************************************************************************/ + +module dreactor.core.Vat; + +import tango.io.selector.Selector; +import tango.io.selector.model.ISelector; +import tango.core.Exception; +import tango.core.Thread; +import tango.core.Atomic; +import tango.util.collection.CircularSeq; +import tango.util.log.Log; + +import dreactor.transport.AsyncSocketConduit; +import dreactor.core.Task; +import dreactor.util.ThreadSafeQueue; + +static char[] version_string = "Vat.d 0.1 2008-05-31"; + +Logger log; + +enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; +alias Message delegate (Conduit c) HandlerDG; +alias Message function (Conduit c) HandlerFN; + +class TaskAttachment +{ +public + Task task; + HandlerDG dg; + HandlerFN fn; + + this(Task ta, HandlerDG d) + { TaskAttachment t; t.task = ta; t.dg = d; return t; } + + this(Task ta, HandlerFN f) + { TaskAttachment t; t.task = ta; t.fn = f; return t; } + + public Message opCall(Conduit c) { dg is null ? return fn() : return dg(c); } +} + +class Vat +{ +private + Thread thread; + bool running; + + Task[int] tasks; + int taskCount; + +public + + this(Task t) + { + addTask(t); + this(); + } + + this() + { + log = Log.lookup("dreactor.core.Vat"); + + running = true; + thread = new Thread(&eventLoop); + thread.start(); + } + + void addTask(Task t) + { + t.setVat(this); + ++taskCount; + tasks[taskCount] = t; + t.setId(taskCount); + } + + void exit() + { + running = false; + } + + void wait() + { + thread.join(); + } + + bool addConnection() + { + log.trace("adding handler"); + return selector.register(h.transport, h.events(), h); + } + + bool remConnection(Dispatcher handler) + { + return selector.unregister(h.transport); + } + +private + void eventLoop() + { + auto selector = new Selector(); + selector.open(); + do + { + execTasks(); + auto eventCount = selector.select(0.01); + + if (eventCount > 0) + { + // process events + foreach (SelectionKey key; selector.selectedSet()) + { + if (key.isReadable()) + { + // incoming data + log.trace("Read event fired"); + auto conn = cast(Dispatcher) key.attachment; + if ( Dispatcher.State.listening == conn.getState() ) + conn.handleConnection(conn.transport, &addConnection); + else + processReturn(conn.handleIncoming(), selector, conn); + } + else if (key.isWritable()) + { + log.trace("Write event fired"); + auto conn = cast(Dispatcher) key.attachment; + processReturn(conn.handleOutgoing(), selector, conn); + } + else if (key.isHangup()) + { + log.trace("Hangup event fired"); + auto conn = cast(Dispatcher) key.attachment; + processReturn(conn.handleDisconnect(), selector, conn); + } + else if (key.isError() || key.isInvalidHandle()) + { + log.trace("Error event fired"); + // error, close connection + auto conn = cast(Dispatcher) key.attachment; + conn.handleError(&remConnection); + } + } + } + else if (eventCount == 0) + { + /* can't think of anything useful to do here. */ + } + else + { + log.error("Selector.select returned {}", eventCount); + } + + } while (running) + + } + + void execTasks() + { + foreach(int k; tasks.keys) + { + if (tasks[k].state() == Fiber.State.HOLD) + tasks[k].call(); + if (tasks[k].state() == Fiber.State.TERM) + tasks.remove(k); + } + } + + void processReturn(int result, Selector s, Dispatcher h) + { + switch(result) + { + case CLOSE: + s.unregister(h.transport); + h.transport.detach(); + break; + case UNREGISTER: + s.unregister(h.transport); + break; + case REMAIN: + //this space intentially left blank + break; + case REGISTER: + s.register(h.transport, h.events(), h); + break; + case REREGISTER: + s.register(h.transport, h.events(), h); + break; + default: + log.error("processReturn: unknown return value"); + } + } +}