11
|
1 /*******************************************************************************
|
|
2
|
|
3 copyright: Copyright (c) 2008 Rick Richardson. All rights reserved
|
|
4
|
|
5 license: BSD style: $(LICENSE)
|
|
6
|
|
7 version: Initial release v0.1 : May 2008
|
|
8
|
|
9 author: Rick Richardson
|
|
10
|
|
11 *******************************************************************************/
|
|
12
|
|
13 module dreactor.core.Vat;
|
|
14
|
|
15 import tango.io.selector.Selector;
|
|
16 import tango.io.selector.model.ISelector;
|
|
17 import tango.core.Exception;
|
|
18 import tango.core.Thread;
|
|
19 import tango.core.Atomic;
|
|
20 import tango.util.collection.CircularSeq;
|
|
21 import tango.util.log.Log;
|
|
22
|
|
23 import dreactor.transport.AsyncSocketConduit;
|
|
24 import dreactor.core.Task;
|
|
25 import dreactor.util.ThreadSafeQueue;
|
|
26
|
|
27 static char[] version_string = "Vat.d 0.1 2008-05-31";
|
|
28
|
|
29 Logger log;
|
|
30
|
|
31 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
|
|
32 alias Message delegate (Conduit c) HandlerDG;
|
|
33 alias Message function (Conduit c) HandlerFN;
|
|
34
|
|
35 class TaskAttachment
|
|
36 {
|
|
37 public
|
|
38 Task task;
|
|
39 HandlerDG dg;
|
|
40 HandlerFN fn;
|
|
41
|
|
42 this(Task ta, HandlerDG d)
|
|
43 { TaskAttachment t; t.task = ta; t.dg = d; return t; }
|
|
44
|
|
45 this(Task ta, HandlerFN f)
|
|
46 { TaskAttachment t; t.task = ta; t.fn = f; return t; }
|
|
47
|
|
48 public Message opCall(Conduit c) { dg is null ? return fn() : return dg(c); }
|
|
49 }
|
|
50
|
|
51 class Vat
|
|
52 {
|
|
53 private
|
|
54 Thread thread;
|
|
55 bool running;
|
|
56
|
|
57 Task[int] tasks;
|
|
58 int taskCount;
|
|
59
|
|
60 public
|
|
61
|
|
62 this(Task t)
|
|
63 {
|
|
64 addTask(t);
|
|
65 this();
|
|
66 }
|
|
67
|
|
68 this()
|
|
69 {
|
|
70 log = Log.lookup("dreactor.core.Vat");
|
|
71
|
|
72 running = true;
|
|
73 thread = new Thread(&eventLoop);
|
|
74 thread.start();
|
|
75 }
|
|
76
|
|
77 void addTask(Task t)
|
|
78 {
|
|
79 t.setVat(this);
|
|
80 ++taskCount;
|
|
81 tasks[taskCount] = t;
|
|
82 t.setId(taskCount);
|
|
83 }
|
|
84
|
|
85 void exit()
|
|
86 {
|
|
87 running = false;
|
|
88 }
|
|
89
|
|
90 void wait()
|
|
91 {
|
|
92 thread.join();
|
|
93 }
|
|
94
|
|
95 bool addConnection()
|
|
96 {
|
|
97 log.trace("adding handler");
|
|
98 return selector.register(h.transport, h.events(), h);
|
|
99 }
|
|
100
|
|
101 bool remConnection(Dispatcher handler)
|
|
102 {
|
|
103 return selector.unregister(h.transport);
|
|
104 }
|
|
105
|
|
106 private
|
|
107 void eventLoop()
|
|
108 {
|
|
109 auto selector = new Selector();
|
|
110 selector.open();
|
|
111 do
|
|
112 {
|
|
113 execTasks();
|
|
114 auto eventCount = selector.select(0.01);
|
|
115
|
|
116 if (eventCount > 0)
|
|
117 {
|
|
118 // process events
|
|
119 foreach (SelectionKey key; selector.selectedSet())
|
|
120 {
|
|
121 if (key.isReadable())
|
|
122 {
|
|
123 // incoming data
|
|
124 log.trace("Read event fired");
|
|
125 auto conn = cast(Dispatcher) key.attachment;
|
|
126 if ( Dispatcher.State.listening == conn.getState() )
|
|
127 conn.handleConnection(conn.transport, &addConnection);
|
|
128 else
|
|
129 processReturn(conn.handleIncoming(), selector, conn);
|
|
130 }
|
|
131 else if (key.isWritable())
|
|
132 {
|
|
133 log.trace("Write event fired");
|
|
134 auto conn = cast(Dispatcher) key.attachment;
|
|
135 processReturn(conn.handleOutgoing(), selector, conn);
|
|
136 }
|
|
137 else if (key.isHangup())
|
|
138 {
|
|
139 log.trace("Hangup event fired");
|
|
140 auto conn = cast(Dispatcher) key.attachment;
|
|
141 processReturn(conn.handleDisconnect(), selector, conn);
|
|
142 }
|
|
143 else if (key.isError() || key.isInvalidHandle())
|
|
144 {
|
|
145 log.trace("Error event fired");
|
|
146 // error, close connection
|
|
147 auto conn = cast(Dispatcher) key.attachment;
|
|
148 conn.handleError(&remConnection);
|
|
149 }
|
|
150 }
|
|
151 }
|
|
152 else if (eventCount == 0)
|
|
153 {
|
|
154 /* can't think of anything useful to do here. */
|
|
155 }
|
|
156 else
|
|
157 {
|
|
158 log.error("Selector.select returned {}", eventCount);
|
|
159 }
|
|
160
|
|
161 } while (running)
|
|
162
|
|
163 }
|
|
164
|
|
165 void execTasks()
|
|
166 {
|
|
167 foreach(int k; tasks.keys)
|
|
168 {
|
|
169 if (tasks[k].state() == Fiber.State.HOLD)
|
|
170 tasks[k].call();
|
|
171 if (tasks[k].state() == Fiber.State.TERM)
|
|
172 tasks.remove(k);
|
|
173 }
|
|
174 }
|
|
175
|
|
176 void processReturn(int result, Selector s, Dispatcher h)
|
|
177 {
|
|
178 switch(result)
|
|
179 {
|
|
180 case CLOSE:
|
|
181 s.unregister(h.transport);
|
|
182 h.transport.detach();
|
|
183 break;
|
|
184 case UNREGISTER:
|
|
185 s.unregister(h.transport);
|
|
186 break;
|
|
187 case REMAIN:
|
|
188 //this space intentially left blank
|
|
189 break;
|
|
190 case REGISTER:
|
|
191 s.register(h.transport, h.events(), h);
|
|
192 break;
|
|
193 case REREGISTER:
|
|
194 s.register(h.transport, h.events(), h);
|
|
195 break;
|
|
196 default:
|
|
197 log.error("processReturn: unknown return value");
|
|
198 }
|
|
199 }
|
|
200 }
|