comparison dreactor/core/Vat.d @ 11:5836613d16ac

reorg! reorg!
author rick@minifunk
date Tue, 12 Aug 2008 16:59:56 -0400
parents
children d6a3cfe7c3de
comparison
equal deleted inserted replaced
10:e75a2e506b1d 11:5836613d16ac
1 /*******************************************************************************
2
3 copyright: Copyright (c) 2008 Rick Richardson. All rights reserved
4
5 license: BSD style: $(LICENSE)
6
7 version: Initial release v0.1 : May 2008
8
9 author: Rick Richardson
10
11 *******************************************************************************/
12
13 module dreactor.core.Vat;
14
15 import tango.io.selector.Selector;
16 import tango.io.selector.model.ISelector;
17 import tango.core.Exception;
18 import tango.core.Thread;
19 import tango.core.Atomic;
20 import tango.util.collection.CircularSeq;
21 import tango.util.log.Log;
22
23 import dreactor.transport.AsyncSocketConduit;
24 import dreactor.core.Task;
25 import dreactor.util.ThreadSafeQueue;
26
27 static char[] version_string = "Vat.d 0.1 2008-05-31";
28
29 Logger log;
30
31 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
32 alias Message delegate (Conduit c) HandlerDG;
33 alias Message function (Conduit c) HandlerFN;
34
35 class TaskAttachment
36 {
37 public
38 Task task;
39 HandlerDG dg;
40 HandlerFN fn;
41
42 this(Task ta, HandlerDG d)
43 { TaskAttachment t; t.task = ta; t.dg = d; return t; }
44
45 this(Task ta, HandlerFN f)
46 { TaskAttachment t; t.task = ta; t.fn = f; return t; }
47
48 public Message opCall(Conduit c) { dg is null ? return fn() : return dg(c); }
49 }
50
51 class Vat
52 {
53 private
54 Thread thread;
55 bool running;
56
57 Task[int] tasks;
58 int taskCount;
59
60 public
61
62 this(Task t)
63 {
64 addTask(t);
65 this();
66 }
67
68 this()
69 {
70 log = Log.lookup("dreactor.core.Vat");
71
72 running = true;
73 thread = new Thread(&eventLoop);
74 thread.start();
75 }
76
77 void addTask(Task t)
78 {
79 t.setVat(this);
80 ++taskCount;
81 tasks[taskCount] = t;
82 t.setId(taskCount);
83 }
84
85 void exit()
86 {
87 running = false;
88 }
89
90 void wait()
91 {
92 thread.join();
93 }
94
95 bool addConnection()
96 {
97 log.trace("adding handler");
98 return selector.register(h.transport, h.events(), h);
99 }
100
101 bool remConnection(Dispatcher handler)
102 {
103 return selector.unregister(h.transport);
104 }
105
106 private
107 void eventLoop()
108 {
109 auto selector = new Selector();
110 selector.open();
111 do
112 {
113 execTasks();
114 auto eventCount = selector.select(0.01);
115
116 if (eventCount > 0)
117 {
118 // process events
119 foreach (SelectionKey key; selector.selectedSet())
120 {
121 if (key.isReadable())
122 {
123 // incoming data
124 log.trace("Read event fired");
125 auto conn = cast(Dispatcher) key.attachment;
126 if ( Dispatcher.State.listening == conn.getState() )
127 conn.handleConnection(conn.transport, &addConnection);
128 else
129 processReturn(conn.handleIncoming(), selector, conn);
130 }
131 else if (key.isWritable())
132 {
133 log.trace("Write event fired");
134 auto conn = cast(Dispatcher) key.attachment;
135 processReturn(conn.handleOutgoing(), selector, conn);
136 }
137 else if (key.isHangup())
138 {
139 log.trace("Hangup event fired");
140 auto conn = cast(Dispatcher) key.attachment;
141 processReturn(conn.handleDisconnect(), selector, conn);
142 }
143 else if (key.isError() || key.isInvalidHandle())
144 {
145 log.trace("Error event fired");
146 // error, close connection
147 auto conn = cast(Dispatcher) key.attachment;
148 conn.handleError(&remConnection);
149 }
150 }
151 }
152 else if (eventCount == 0)
153 {
154 /* can't think of anything useful to do here. */
155 }
156 else
157 {
158 log.error("Selector.select returned {}", eventCount);
159 }
160
161 } while (running)
162
163 }
164
165 void execTasks()
166 {
167 foreach(int k; tasks.keys)
168 {
169 if (tasks[k].state() == Fiber.State.HOLD)
170 tasks[k].call();
171 if (tasks[k].state() == Fiber.State.TERM)
172 tasks.remove(k);
173 }
174 }
175
176 void processReturn(int result, Selector s, Dispatcher h)
177 {
178 switch(result)
179 {
180 case CLOSE:
181 s.unregister(h.transport);
182 h.transport.detach();
183 break;
184 case UNREGISTER:
185 s.unregister(h.transport);
186 break;
187 case REMAIN:
188 //this space intentially left blank
189 break;
190 case REGISTER:
191 s.register(h.transport, h.events(), h);
192 break;
193 case REREGISTER:
194 s.register(h.transport, h.events(), h);
195 break;
196 default:
197 log.error("processReturn: unknown return value");
198 }
199 }
200 }