comparison dreactor/core/Dispatcher.d @ 11:5836613d16ac

reorg! reorg!
author rick@minifunk
date Tue, 12 Aug 2008 16:59:56 -0400
parents 5412a1ff2e49
children
comparison
equal deleted inserted replaced
10:e75a2e506b1d 11:5836613d16ac
1 module dreactor.core.Dispatcher; 1 module dreactor.protocol.Dispatcher;
2 2
3 import tango.io.selector.model.ISelector; 3 import tango.io.selector.model.ISelector;
4 import tango.util.collection.CircularSeq; 4 import tango.util.collection.CircularSeq;
5 import tango.net.Socket; 5 import tango.net.Socket;
6 public import tango.core.Exception; 6 public import tango.core.Exception;
7 import dreactor.transport.AsyncSocketConduit; 7 import dreactor.transport.AsyncSocketConduit;
8 8
9 import tango.util.log.Log; 9 import tango.util.log.Log;
10 import tango.util.log.Config; 10 import tango.util.log.Config;
11 11
12 alias bool delegate(Dispatcher) RegisterD;
13
14 alias int delegate(Dispatcher) IncomingHandlerD;
15 alias int delegate(Dispatcher) OutgoingHandlerD;
16 alias int delegate(Dispatcher, RegisterD) ErrorHandlerD;
17 alias int delegate(Dispatcher) DisconnectHandlerD;
18 alias int delegate(Conduit, RegisterD) ConnectHandlerD;
19
20 alias int function(Dispatcher) IncomingHandlerF;
21 alias int function(Dispatcher) OutgoingHandlerF;
22 alias int function(Dispatcher, RegisterD) ErrorHandlerF;
23 alias int function(Dispatcher) DisconnectHandlerF;
24 alias int function(Conduit, RegisterD) ConnectHandlerF;
25
26
27 /******************************************************************************
28 Dispatcher object. To be used by the SelectLoop to manage callbacks
29 for events. It may also be used to buffer data inbetween requests.
30 These can be populated passed to a SelectLoop directly by the end user,
31 or may be managed by a chosen Protocol.
32 ******************************************************************************/
33 class Dispatcher 12 class Dispatcher
34 { 13 {
35 public 14 public
36 enum State { init, connected, listening, idle, closing }; 15 this (Conduit trans)
37
38 /**************************************************************************
39
40 Standard Ctor, takes a transport_
41
42 **************************************************************************/
43 this (Conduit trans, bool listener = false)
44 { 16 {
45 transport_ = trans; 17 cond = trans;
46 ibuf_len = 0; 18 ibuf_len = 0;
47 i_offset = 0;
48 o_offset = 0; 19 o_offset = 0;
49 out_buffers = new CircularSeq!(char[]); 20 out_buffers = new CircularSeq!(char[]);
50 log = Log.lookup("dreactor.core.Dispatcher"); 21 log = Log.lookup("dreactor.core.Dispatcher");
51 } 22 }
52 23
53 /********************************************************************** 24 /**************************************************************************
54 25
55 Setters for the handlers. These are set by the Protocols as well 26 onSend -- Send method
56 27 Called by the vat in response to a FD writeable event.
57 **********************************************************************/ 28 Sends data, returns amount sent. Unregisters Handler for sending
58 29 if there is no more data left to send.
59 void setIncomingHandler(IncomingHandlerD hand) 30
31 ***************************************************************************/
32 public int onSend()
60 { 33 {
61 inD = hand; 34 Logger log = Log.lookup("Handlers.onSend");
62 inF = null; 35
63 } 36 char[] outbuf = nextBuffer();
64 37 if (outbuf !is null)
65 void setIncomingHandler(IncomingHandlerF hand)
66 {
67 inF = hand;
68 inD = null;
69 }
70
71 void setOutgoingHandler(OutgoingHandlerD hand)
72 {
73 outD = hand;
74 outF = null;
75 }
76
77 void setOutgoingHandler(OutgoingHandlerF hand)
78 {
79 outF = hand;
80 outD = null;
81 }
82
83 void setErrorHandler(ErrorHandlerD hand)
84 {
85 errD = hand;
86 errF = null;
87 }
88
89 void setErrorHandler(ErrorHandlerF hand)
90 {
91 errF = hand;
92 errD = null;
93 }
94
95 void setDisconnectHandler(DisconnectHandlerD hand)
96 {
97 disD = hand;
98 disF = null;
99 }
100
101 void setDisconnectHandler(DisconnectHandlerF hand)
102 {
103 disF = hand;
104 disD = null;
105 }
106
107 void setConnectHandler(ConnectHandlerD hand)
108 {
109 conD = hand;
110 conF = null;
111 }
112
113 void setConnectHandler(ConnectHandlerF hand)
114 {
115 conF = hand;
116 conD = null;
117 }
118
119 /**********************************************************************
120
121 Handlers to be called by the SelectLoop when events occur
122
123 **********************************************************************/
124 int handleIncoming()
125 {
126 if (inD !is null)
127 return inD(this);
128 else if (inF !is null)
129 return inF(this);
130 else
131 throw new Exception("no Incoming handler set");
132 }
133
134 int handleOutgoing()
135 {
136 if (outD !is null)
137 return outD(this);
138 else if (outF !is null)
139 return outF(this);
140 else
141 throw new Exception("no Outgoing handler set");
142 }
143
144 int handleError(RegisterD reg)
145 {
146 if (errD !is null)
147 return errD(this, reg);
148 else if (errF !is null)
149 return errF(this, reg);
150 }
151
152 int handleDisconnect()
153 {
154 if (disD !is null)
155 return disD(this);
156 else if (disF !is null)
157 return disF(this);
158 }
159
160 int handleConnection(Conduit cond, RegisterD reg )
161 {
162 if (conD !is null)
163 { 38 {
164 return conD(cond, reg); 39 int sent = cond.write(outbuf);
40 if (sent > 0)
41 {
42 if (! addOffset(sent))
43 {
44 return UNREGISTER;
45 }
46 }
47 else if (sent == 0)
48 {
49 log.error("Select said socket was writable, but sent 0 bytes");
50 }
51 else
52 {
53 log.error("Socket send return ERR {}", sent);
54 }
55 return REMAIN;
165 } 56 }
166 else if (conF !is null) 57 else
167 { 58 {
168 return conF(cond, reg); 59 return UNREGISTER;
169 } 60 }
170 } 61 }
171
172 /**************************************************************************
173
174 Sending / Receiving helpers
175
176 **************************************************************************/
177 62
178 /************************************************************************** 63 /**************************************************************************
179 64
180 appendOutBuffer 65 appendOutBuffer
181 66
182 Adds an outgoing buffer to the list. This returns true if the list 67 Adds an outgoing buffer to the list. This returns true if the list
183 was empty, indicating that the handler should be registered with the 68 was empty, indicating that the handler should be registered with the
184 SelectLoop. If it returns false, it was probably already registered. 69 SelectLoop. If it returns false, it was probably already registered.
185 70
186 **************************************************************************/ 71 **************************************************************************/
187 synchronized bool appendOutBuffer(char[] outbuf) 72 bool appendOutBuffer(char[] outbuf)
188 { 73 {
189 out_buffers.append(outbuf); 74 out_buffers.append(outbuf);
190 out_buffers_len++; 75 out_buffers_len++;
191 if (out_buffers_len == 1) 76 if (out_buffers_len == 1)
192 return true; 77 return true;
193 else 78 else
194 return false; 79 return false;
195 } 80 }
196 81
197 /************************************************************************** 82 /**************************************************************************
198 83
199 addOffset 84 addOffset
200 Use this function to update the offset position after a successful data 85 Use this function to update the offset position after a successful data
201 send. This not only manages the current offset, but will update the 86 send. This not only manages the current offset, but will update the
202 out buffer chain if necessary. 87 out buffer chain if necessary.
203 88
204 Returns: false if there is nothing left to send, true if there is. 89 Returns: false if there is nothing left to send, true if there is.
205 90
206 **************************************************************************/ 91 **************************************************************************/
207 synchronized bool addOffset(int off) 92 bool addOffset(int off)
208 in 93 in
209 { 94 {
210 assert(out_buffers_len > 0); 95 assert(out_buffers_len > 0);
211 } 96 }
212 body 97 body
221 } 106 }
222 else 107 else
223 o_offset += off; 108 o_offset += off;
224 return true; 109 return true;
225 } 110 }
226 111
227 /************************************************************************** 112 /**************************************************************************
228 113
229 char[] nextBuffer 114 char[] nextBuffer
230 115
231 Returns a slice of the current outbound buffer, returns a char[] pointing 116 Returns a slice of the current outbound buffer, returns a char[] pointing
240 } 125 }
241 126
242 return out_buffers.head()[o_offset .. $]; 127 return out_buffers.head()[o_offset .. $];
243 } 128 }
244 129
245 /************************************************************************** 130 Conduit cond;
246
247 listen
248 Enable listening on the socket attached to this connectionhandler
249
250 **************************************************************************/
251 int listen(IPv4Address addr)
252 {
253 (cast(AsyncSocketConduit)transport_).bind(addr).listen();
254 state_ = State.listening;
255 return 0;
256 }
257
258 Conduit transport()
259 {
260 return transport_;
261 }
262 /**************************************************************************
263
264 Configuration functions
265
266 **************************************************************************/
267 Event events()
268 {
269 return events_;
270 }
271 void events(Event e)
272 {
273 events_ = e;
274 }
275 void addEvent(Event e)
276 {
277 events_ |= e;
278 }
279 void remEvent(Event e)
280 {
281 events_ &= ~e;
282 }
283
284 State getState() {return state_;}
285
286 /*
287 connection handlers are left out of this because
288 this method is used by the listener socket to pass
289 on its handlers to the accepted socket. An accepted
290 socket will generally do different things onConnection
291 */
292 void setHandlers(Dispatcher other)
293 {
294 inD = other.inD;
295 outD = other.outD;
296 errD = other.errD;
297 disD = other.disD;
298 inF = other.inF;
299 outF = other.outF;
300 errF = other.errF;
301 disF = other.disF;
302 }
303
304 /**************************************************************************
305
306 Freelist allocators and deallocators
307
308 **************************************************************************/
309 static synchronized Dispatcher New(Conduit tran, Dispatcher other = null)
310 {
311 Dispatcher hand;
312 if (freelist)
313 {
314 hand = freelist;
315 freelist = hand.next;
316 hand.transport_ = tran;
317 }
318 else
319 hand = new Dispatcher(tran);
320
321 if (!(other is null))
322 {
323 hand.setHandlers(other);
324 }
325 return hand;
326 }
327
328 static synchronized void Delete(Dispatcher hand)
329 {
330 hand.next = freelist;
331 freelist = hand.initialize();
332 }
333
334 private
335
336 char[] in_buffer;
337 CircularSeq!(char[]) out_buffers; 131 CircularSeq!(char[]) out_buffers;
338 int out_buffers_len; 132 int out_buffers_len;
339 int ibuf_len; 133 int ibuf_len;
340 int i_offset;
341 int o_offset; 134 int o_offset;
342 Logger log; 135 Logger log;
343
344 package Conduit transport_;
345 State state_;
346 Event events_;
347 IncomingHandlerD inD;
348 OutgoingHandlerD outD;
349 ErrorHandlerD errD;
350 DisconnectHandlerD disD;
351 ConnectHandlerD conD;
352
353 IncomingHandlerF inF;
354 OutgoingHandlerF outF;
355 ErrorHandlerF errF;
356 DisconnectHandlerF disF;
357 ConnectHandlerF conF;
358
359 static Dispatcher freelist;
360 Dispatcher next;
361
362 /**************************************************************************
363 Copy ctor, creates a new Dispatcher using the settings
364 of an existing handler.
365 **************************************************************************/
366 Dispatcher initialize()
367 {
368 transport_ = null;
369 state_ = State.init;
370 ibuf_len = 0;
371 i_offset = 0;
372 o_offset = 0;
373 out_buffers.clear();
374 inD = null;
375 outD = null;
376 errD = null;
377 disD = null;
378 conD = null;
379 inF = null;
380 outF = null;
381 errF = null;
382 disF = null;
383 conF = null;
384 return this;
385 }
386 } 136 }
387