Mercurial > projects > dreactor
comparison dreactor/core/Vat.d @ 11:5836613d16ac
reorg! reorg!
author | rick@minifunk |
---|---|
date | Tue, 12 Aug 2008 16:59:56 -0400 |
parents | |
children | d6a3cfe7c3de |
comparison
equal
deleted
inserted
replaced
10:e75a2e506b1d | 11:5836613d16ac |
---|---|
1 /******************************************************************************* | |
2 | |
3 copyright: Copyright (c) 2008 Rick Richardson. All rights reserved | |
4 | |
5 license: BSD style: $(LICENSE) | |
6 | |
7 version: Initial release v0.1 : May 2008 | |
8 | |
9 author: Rick Richardson | |
10 | |
11 *******************************************************************************/ | |
12 | |
13 module dreactor.core.Vat; | |
14 | |
15 import tango.io.selector.Selector; | |
16 import tango.io.selector.model.ISelector; | |
17 import tango.core.Exception; | |
18 import tango.core.Thread; | |
19 import tango.core.Atomic; | |
20 import tango.util.collection.CircularSeq; | |
21 import tango.util.log.Log; | |
22 | |
23 import dreactor.transport.AsyncSocketConduit; | |
24 import dreactor.core.Task; | |
25 import dreactor.util.ThreadSafeQueue; | |
26 | |
27 static char[] version_string = "Vat.d 0.1 2008-05-31"; | |
28 | |
29 Logger log; | |
30 | |
31 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2}; | |
32 alias Message delegate (Conduit c) HandlerDG; | |
33 alias Message function (Conduit c) HandlerFN; | |
34 | |
35 class TaskAttachment | |
36 { | |
37 public | |
38 Task task; | |
39 HandlerDG dg; | |
40 HandlerFN fn; | |
41 | |
42 this(Task ta, HandlerDG d) | |
43 { TaskAttachment t; t.task = ta; t.dg = d; return t; } | |
44 | |
45 this(Task ta, HandlerFN f) | |
46 { TaskAttachment t; t.task = ta; t.fn = f; return t; } | |
47 | |
48 public Message opCall(Conduit c) { dg is null ? return fn() : return dg(c); } | |
49 } | |
50 | |
51 class Vat | |
52 { | |
53 private | |
54 Thread thread; | |
55 bool running; | |
56 | |
57 Task[int] tasks; | |
58 int taskCount; | |
59 | |
60 public | |
61 | |
62 this(Task t) | |
63 { | |
64 addTask(t); | |
65 this(); | |
66 } | |
67 | |
68 this() | |
69 { | |
70 log = Log.lookup("dreactor.core.Vat"); | |
71 | |
72 running = true; | |
73 thread = new Thread(&eventLoop); | |
74 thread.start(); | |
75 } | |
76 | |
77 void addTask(Task t) | |
78 { | |
79 t.setVat(this); | |
80 ++taskCount; | |
81 tasks[taskCount] = t; | |
82 t.setId(taskCount); | |
83 } | |
84 | |
85 void exit() | |
86 { | |
87 running = false; | |
88 } | |
89 | |
90 void wait() | |
91 { | |
92 thread.join(); | |
93 } | |
94 | |
95 bool addConnection() | |
96 { | |
97 log.trace("adding handler"); | |
98 return selector.register(h.transport, h.events(), h); | |
99 } | |
100 | |
101 bool remConnection(Dispatcher handler) | |
102 { | |
103 return selector.unregister(h.transport); | |
104 } | |
105 | |
106 private | |
107 void eventLoop() | |
108 { | |
109 auto selector = new Selector(); | |
110 selector.open(); | |
111 do | |
112 { | |
113 execTasks(); | |
114 auto eventCount = selector.select(0.01); | |
115 | |
116 if (eventCount > 0) | |
117 { | |
118 // process events | |
119 foreach (SelectionKey key; selector.selectedSet()) | |
120 { | |
121 if (key.isReadable()) | |
122 { | |
123 // incoming data | |
124 log.trace("Read event fired"); | |
125 auto conn = cast(Dispatcher) key.attachment; | |
126 if ( Dispatcher.State.listening == conn.getState() ) | |
127 conn.handleConnection(conn.transport, &addConnection); | |
128 else | |
129 processReturn(conn.handleIncoming(), selector, conn); | |
130 } | |
131 else if (key.isWritable()) | |
132 { | |
133 log.trace("Write event fired"); | |
134 auto conn = cast(Dispatcher) key.attachment; | |
135 processReturn(conn.handleOutgoing(), selector, conn); | |
136 } | |
137 else if (key.isHangup()) | |
138 { | |
139 log.trace("Hangup event fired"); | |
140 auto conn = cast(Dispatcher) key.attachment; | |
141 processReturn(conn.handleDisconnect(), selector, conn); | |
142 } | |
143 else if (key.isError() || key.isInvalidHandle()) | |
144 { | |
145 log.trace("Error event fired"); | |
146 // error, close connection | |
147 auto conn = cast(Dispatcher) key.attachment; | |
148 conn.handleError(&remConnection); | |
149 } | |
150 } | |
151 } | |
152 else if (eventCount == 0) | |
153 { | |
154 /* can't think of anything useful to do here. */ | |
155 } | |
156 else | |
157 { | |
158 log.error("Selector.select returned {}", eventCount); | |
159 } | |
160 | |
161 } while (running) | |
162 | |
163 } | |
164 | |
165 void execTasks() | |
166 { | |
167 foreach(int k; tasks.keys) | |
168 { | |
169 if (tasks[k].state() == Fiber.State.HOLD) | |
170 tasks[k].call(); | |
171 if (tasks[k].state() == Fiber.State.TERM) | |
172 tasks.remove(k); | |
173 } | |
174 } | |
175 | |
176 void processReturn(int result, Selector s, Dispatcher h) | |
177 { | |
178 switch(result) | |
179 { | |
180 case CLOSE: | |
181 s.unregister(h.transport); | |
182 h.transport.detach(); | |
183 break; | |
184 case UNREGISTER: | |
185 s.unregister(h.transport); | |
186 break; | |
187 case REMAIN: | |
188 //this space intentially left blank | |
189 break; | |
190 case REGISTER: | |
191 s.register(h.transport, h.events(), h); | |
192 break; | |
193 case REREGISTER: | |
194 s.register(h.transport, h.events(), h); | |
195 break; | |
196 default: | |
197 log.error("processReturn: unknown return value"); | |
198 } | |
199 } | |
200 } |