6
|
1 module dreactor.core.Dispatcher;
|
0
|
2
|
|
3 import tango.io.selector.model.ISelector;
|
|
4 import tango.util.collection.CircularSeq;
|
3
|
5 import tango.net.Socket;
|
|
6 public import tango.core.Exception;
|
|
7 import dreactor.transport.AsyncSocketConduit;
|
0
|
8
|
3
|
9 import tango.util.log.Log;
|
|
10 import tango.util.log.Config;
|
0
|
11
|
6
|
12 alias bool delegate(Dispatcher) RegisterD;
|
0
|
13
|
6
|
14 alias int delegate(Dispatcher) IncomingHandlerD;
|
|
15 alias int delegate(Dispatcher) OutgoingHandlerD;
|
|
16 alias int delegate(Dispatcher, RegisterD) ErrorHandlerD;
|
|
17 alias int delegate(Dispatcher) DisconnectHandlerD;
|
3
|
18 alias int delegate(Conduit, RegisterD) ConnectHandlerD;
|
0
|
19
|
6
|
20 alias int function(Dispatcher) IncomingHandlerF;
|
|
21 alias int function(Dispatcher) OutgoingHandlerF;
|
|
22 alias int function(Dispatcher, RegisterD) ErrorHandlerF;
|
|
23 alias int function(Dispatcher) DisconnectHandlerF;
|
3
|
24 alias int function(Conduit, RegisterD) ConnectHandlerF;
|
0
|
25
|
|
26
|
|
27 /******************************************************************************
|
6
|
28 Dispatcher object. To be used by the SelectLoop to manage callbacks
|
0
|
29 for events. It may also be used to buffer data inbetween requests.
|
|
30 These can be populated passed to a SelectLoop directly by the end user,
|
|
31 or may be managed by a chosen Protocol.
|
|
32 ******************************************************************************/
|
6
|
33 class Dispatcher
|
0
|
34 {
|
|
35 public
|
3
|
36 enum State { init, connected, listening, idle, closing };
|
0
|
37
|
|
38 /**************************************************************************
|
|
39
|
3
|
40 Standard Ctor, takes a transport_
|
0
|
41
|
|
42 **************************************************************************/
|
2
|
43 this (Conduit trans, bool listener = false)
|
0
|
44 {
|
3
|
45 transport_ = trans;
|
0
|
46 ibuf_len = 0;
|
|
47 i_offset = 0;
|
|
48 o_offset = 0;
|
3
|
49 out_buffers = new CircularSeq!(char[]);
|
6
|
50 log = Log.lookup("dreactor.core.Dispatcher");
|
0
|
51 }
|
9
|
52
|
0
|
53 /**********************************************************************
|
|
54
|
|
55 Setters for the handlers. These are set by the Protocols as well
|
|
56
|
|
57 **********************************************************************/
|
|
58
|
|
59 void setIncomingHandler(IncomingHandlerD hand)
|
|
60 {
|
|
61 inD = hand;
|
|
62 inF = null;
|
|
63 }
|
|
64
|
|
65 void setIncomingHandler(IncomingHandlerF hand)
|
|
66 {
|
|
67 inF = hand;
|
|
68 inD = null;
|
|
69 }
|
|
70
|
|
71 void setOutgoingHandler(OutgoingHandlerD hand)
|
|
72 {
|
|
73 outD = hand;
|
|
74 outF = null;
|
|
75 }
|
|
76
|
|
77 void setOutgoingHandler(OutgoingHandlerF hand)
|
|
78 {
|
|
79 outF = hand;
|
|
80 outD = null;
|
|
81 }
|
|
82
|
|
83 void setErrorHandler(ErrorHandlerD hand)
|
|
84 {
|
|
85 errD = hand;
|
|
86 errF = null;
|
|
87 }
|
|
88
|
|
89 void setErrorHandler(ErrorHandlerF hand)
|
|
90 {
|
|
91 errF = hand;
|
|
92 errD = null;
|
|
93 }
|
|
94
|
|
95 void setDisconnectHandler(DisconnectHandlerD hand)
|
|
96 {
|
|
97 disD = hand;
|
|
98 disF = null;
|
|
99 }
|
|
100
|
|
101 void setDisconnectHandler(DisconnectHandlerF hand)
|
|
102 {
|
|
103 disF = hand;
|
|
104 disD = null;
|
|
105 }
|
|
106
|
|
107 void setConnectHandler(ConnectHandlerD hand)
|
|
108 {
|
|
109 conD = hand;
|
|
110 conF = null;
|
|
111 }
|
|
112
|
|
113 void setConnectHandler(ConnectHandlerF hand)
|
|
114 {
|
|
115 conF = hand;
|
|
116 conD = null;
|
|
117 }
|
|
118
|
|
119 /**********************************************************************
|
|
120
|
|
121 Handlers to be called by the SelectLoop when events occur
|
|
122
|
|
123 **********************************************************************/
|
3
|
124 int handleIncoming()
|
0
|
125 {
|
3
|
126 if (inD !is null)
|
|
127 return inD(this);
|
|
128 else if (inF !is null)
|
|
129 return inF(this);
|
|
130 else
|
|
131 throw new Exception("no Incoming handler set");
|
0
|
132 }
|
|
133
|
3
|
134 int handleOutgoing()
|
0
|
135 {
|
3
|
136 if (outD !is null)
|
|
137 return outD(this);
|
|
138 else if (outF !is null)
|
|
139 return outF(this);
|
|
140 else
|
|
141 throw new Exception("no Outgoing handler set");
|
0
|
142 }
|
|
143
|
3
|
144 int handleError(RegisterD reg)
|
0
|
145 {
|
3
|
146 if (errD !is null)
|
|
147 return errD(this, reg);
|
|
148 else if (errF !is null)
|
|
149 return errF(this, reg);
|
0
|
150 }
|
|
151
|
3
|
152 int handleDisconnect()
|
0
|
153 {
|
3
|
154 if (disD !is null)
|
|
155 return disD(this);
|
|
156 else if (disF !is null)
|
|
157 return disF(this);
|
0
|
158 }
|
|
159
|
3
|
160 int handleConnection(Conduit cond, RegisterD reg )
|
0
|
161 {
|
3
|
162 if (conD !is null)
|
0
|
163 {
|
3
|
164 return conD(cond, reg);
|
0
|
165 }
|
3
|
166 else if (conF !is null)
|
0
|
167 {
|
3
|
168 return conF(cond, reg);
|
0
|
169 }
|
|
170 }
|
|
171
|
|
172 /**************************************************************************
|
|
173
|
|
174 Sending / Receiving helpers
|
|
175
|
|
176 **************************************************************************/
|
|
177
|
|
178 /**************************************************************************
|
|
179
|
|
180 appendOutBuffer
|
|
181
|
|
182 Adds an outgoing buffer to the list. This returns true if the list
|
|
183 was empty, indicating that the handler should be registered with the
|
|
184 SelectLoop. If it returns false, it was probably already registered.
|
|
185
|
|
186 **************************************************************************/
|
3
|
187 synchronized bool appendOutBuffer(char[] outbuf)
|
0
|
188 {
|
|
189 out_buffers.append(outbuf);
|
|
190 out_buffers_len++;
|
|
191 if (out_buffers_len == 1)
|
|
192 return true;
|
|
193 else
|
|
194 return false;
|
|
195 }
|
|
196
|
|
197 /**************************************************************************
|
|
198
|
|
199 addOffset
|
|
200 Use this function to update the offset position after a successful data
|
|
201 send. This not only manages the current offset, but will update the
|
|
202 out buffer chain if necessary.
|
|
203
|
|
204 Returns: false if there is nothing left to send, true if there is.
|
|
205
|
|
206 **************************************************************************/
|
|
207 synchronized bool addOffset(int off)
|
|
208 in
|
|
209 {
|
|
210 assert(out_buffers_len > 0);
|
|
211 }
|
|
212 body
|
|
213 {
|
|
214 char[] hd = out_buffers.head();
|
|
215 if ((off + o_offset) >= hd.length)
|
|
216 {
|
|
217 out_buffers.removeHead();
|
|
218 o_offset = 0;
|
|
219 out_buffers_len--;
|
|
220 return (out_buffers_len > 0);
|
|
221 }
|
|
222 else
|
|
223 o_offset += off;
|
|
224 return true;
|
|
225 }
|
|
226
|
|
227 /**************************************************************************
|
|
228
|
|
229 char[] nextBuffer
|
|
230
|
|
231 Returns a slice of the current outbound buffer, returns a char[] pointing
|
|
232 to null if there is no current outbound buffer
|
|
233
|
|
234 **************************************************************************/
|
|
235 synchronized char[] nextBuffer()
|
|
236 {
|
|
237 if (out_buffers_len < 1)
|
|
238 {
|
|
239 return null;
|
|
240 }
|
|
241
|
|
242 return out_buffers.head()[o_offset .. $];
|
|
243 }
|
2
|
244
|
|
245 /**************************************************************************
|
|
246
|
|
247 listen
|
|
248 Enable listening on the socket attached to this connectionhandler
|
|
249
|
|
250 **************************************************************************/
|
|
251 int listen(IPv4Address addr)
|
|
252 {
|
3
|
253 (cast(AsyncSocketConduit)transport_).bind(addr).listen();
|
|
254 state_ = State.listening;
|
|
255 return 0;
|
2
|
256 }
|
|
257
|
3
|
258 Conduit transport()
|
|
259 {
|
|
260 return transport_;
|
|
261 }
|
0
|
262 /**************************************************************************
|
|
263
|
|
264 Configuration functions
|
|
265
|
|
266 **************************************************************************/
|
|
267 Event events()
|
|
268 {
|
|
269 return events_;
|
|
270 }
|
|
271 void events(Event e)
|
|
272 {
|
|
273 events_ = e;
|
|
274 }
|
|
275 void addEvent(Event e)
|
|
276 {
|
|
277 events_ |= e;
|
|
278 }
|
|
279 void remEvent(Event e)
|
|
280 {
|
3
|
281 events_ &= ~e;
|
0
|
282 }
|
|
283
|
3
|
284 State getState() {return state_;}
|
0
|
285
|
|
286 /*
|
|
287 connection handlers are left out of this because
|
|
288 this method is used by the listener socket to pass
|
|
289 on its handlers to the accepted socket. An accepted
|
|
290 socket will generally do different things onConnection
|
|
291 */
|
6
|
292 void setHandlers(Dispatcher other)
|
0
|
293 {
|
|
294 inD = other.inD;
|
|
295 outD = other.outD;
|
|
296 errD = other.errD;
|
|
297 disD = other.disD;
|
|
298 inF = other.inF;
|
|
299 outF = other.outF;
|
|
300 errF = other.errF;
|
|
301 disF = other.disF;
|
|
302 }
|
|
303
|
|
304 /**************************************************************************
|
|
305
|
|
306 Freelist allocators and deallocators
|
|
307
|
|
308 **************************************************************************/
|
6
|
309 static synchronized Dispatcher New(Conduit tran, Dispatcher other = null)
|
0
|
310 {
|
6
|
311 Dispatcher hand;
|
0
|
312 if (freelist)
|
|
313 {
|
|
314 hand = freelist;
|
|
315 freelist = hand.next;
|
3
|
316 hand.transport_ = tran;
|
0
|
317 }
|
|
318 else
|
6
|
319 hand = new Dispatcher(tran);
|
0
|
320
|
3
|
321 if (!(other is null))
|
0
|
322 {
|
|
323 hand.setHandlers(other);
|
|
324 }
|
|
325 return hand;
|
|
326 }
|
|
327
|
6
|
328 static synchronized void Delete(Dispatcher hand)
|
0
|
329 {
|
|
330 hand.next = freelist;
|
3
|
331 freelist = hand.initialize();
|
0
|
332 }
|
|
333
|
|
334 private
|
3
|
335
|
|
336 char[] in_buffer;
|
0
|
337 CircularSeq!(char[]) out_buffers;
|
3
|
338 int out_buffers_len;
|
0
|
339 int ibuf_len;
|
|
340 int i_offset;
|
|
341 int o_offset;
|
3
|
342 Logger log;
|
0
|
343
|
3
|
344 package Conduit transport_;
|
|
345 State state_;
|
|
346 Event events_;
|
0
|
347 IncomingHandlerD inD;
|
|
348 OutgoingHandlerD outD;
|
|
349 ErrorHandlerD errD;
|
|
350 DisconnectHandlerD disD;
|
|
351 ConnectHandlerD conD;
|
|
352
|
|
353 IncomingHandlerF inF;
|
|
354 OutgoingHandlerF outF;
|
|
355 ErrorHandlerF errF;
|
|
356 DisconnectHandlerF disF;
|
|
357 ConnectHandlerF conF;
|
|
358
|
6
|
359 static Dispatcher freelist;
|
|
360 Dispatcher next;
|
0
|
361
|
|
362 /**************************************************************************
|
6
|
363 Copy ctor, creates a new Dispatcher using the settings
|
0
|
364 of an existing handler.
|
|
365 **************************************************************************/
|
6
|
366 Dispatcher initialize()
|
0
|
367 {
|
3
|
368 transport_ = null;
|
|
369 state_ = State.init;
|
0
|
370 ibuf_len = 0;
|
|
371 i_offset = 0;
|
|
372 o_offset = 0;
|
3
|
373 out_buffers.clear();
|
0
|
374 inD = null;
|
|
375 outD = null;
|
|
376 errD = null;
|
|
377 disD = null;
|
|
378 conD = null;
|
|
379 inF = null;
|
|
380 outF = null;
|
|
381 errF = null;
|
|
382 disF = null;
|
|
383 conF = null;
|
3
|
384 return this;
|
0
|
385 }
|
|
386 }
|
|
387
|