comparison asyncdreactor/protocol/RawTcp.d @ 11:5836613d16ac

reorg! reorg!
author rick@minifunk
date Tue, 12 Aug 2008 16:59:56 -0400
parents dreactor/protocol/RawTcp.d@e75a2e506b1d
children
comparison
equal deleted inserted replaced
10:e75a2e506b1d 11:5836613d16ac
1 module dreactor.protocol.RawTcp;
2
3 import tango.io.Conduit;
4 import tango.io.selector.model.ISelector;
5 import tango.net.Socket;
6 import tango.util.collection.CircularSeq;
7 import tango.util.log.Log;
8 import tango.util.log.Config;
9
10 import dreactor.transport.AsyncSocketConduit;
11 import dreactor.core.Vat;
12 import dreactor.core.Dispatcher;
13
14 /******************************************************************************
15
16 Basic TCP server or client routines for sending raw data.
17
18 ******************************************************************************/
19 class RawTCPListener
20 {
21 public
22 this(Dispatcher mgr, Vat sel, IPv4Address addr)
23 {
24 manager = mgr;
25 mgr.events(Event.Read);
26 mgr.setOutgoingHandler(&RawTCPHandler.onSend);
27 mgr.setIncomingHandler(&onReceive);
28 mgr.setConnectHandler(&accept);
29 mgr.setErrorHandler(&RawTCPHandler.onError);
30 mgr.setDisconnectHandler(&RawTCPHandler.onHangup);
31 mgr.listen(addr);
32
33 sel.addConnection(mgr);
34 vat = sel;
35 log = Log.lookup("dreactor.protocol.RawTcpServer");
36 log.info("log initialized");
37 children = new CircularSeq!(Dispatcher);
38 }
39 this(Vat sel, IPv4Address addr)
40 {
41 AsyncSocketConduit cond = new AsyncSocketConduit;
42 cond.socket().setAddressReuse(true);
43 Dispatcher lh = new Dispatcher(cond, true);
44 this(lh, sel, addr);
45 }
46
47 ~this()
48 {
49 close();
50 }
51
52 int accept(Conduit cond, RegisterD reg)
53 {
54 AsyncSocketConduit newcond = new AsyncSocketConduit;
55 (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket);
56 Dispatcher h = Dispatcher.New(newcond, manager);
57 h.events(Event.Read);
58 vat.addConnection(h);
59 children.append(h);
60 log.info("accepted new connection");
61 return 0;
62 }
63
64 int broadcast(char[] outbuf, Dispatcher[] excluded = null)
65 {
66 foreach(Dispatcher h; children)
67 {
68 if (excluded && excluded.includes(h))
69 continue;
70 if (h.appendOutBuffer(outbuf))
71 {
72 h.addEvent(Event.Write);
73 vat.addConnection(h);
74 }
75 }
76 return 0;
77 }
78
79 /**************************************************************************
80
81 send
82 User-called function to send data to the counterpart at the other
83 end of the connection. This sets up the connection manager to send
84 data as the socket becomes free.
85
86 **************************************************************************/
87 int send(Dispatcher d, char[] outbuf, IPv4Address addr = null)
88 {
89 if (d.appendOutBuffer(outbuf))
90 {
91 d.addEvent(Event.Write);
92 if (!vat.addConnection(d))
93 {
94 log.error("unable to register mgr");
95 }
96 }
97 return 0;
98 }
99
100 /**************************************************************************
101
102 receive
103 IncomingHandlerD
104 Default incoming data handler. Should be replaced with something useful.
105
106 **************************************************************************/
107 int onReceive(Dispatcher h)
108 {
109 Logger log = Log.lookup("Handlers.onReceive");
110
111 char inbuf[8192];
112 int amt;
113 if((amt = h.transport.read(inbuf)) > 0)
114 {
115 if (dataHandler)
116 dataHandler(inbuf[0 .. amt], h);
117 else
118 log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
119 }
120 else
121 {
122 if (amt == 0)
123 {
124 children.remove(h);
125 (cast(AsyncSocketConduit) h.transport).shutdown();
126 return CLOSE;
127 }
128 log.error("Received no data, err = {}", amt);
129 }
130 return REMAIN;
131 }
132
133 void close()
134 {
135 foreach(Dispatcher d; children)
136 {
137 (cast(AsyncSocketConduit)d.transport).shutdown();
138 (cast(AsyncSocketConduit)d.transport).detach();
139 }
140 (cast(AsyncSocketConduit)manager.transport).shutdown();
141 (cast(AsyncSocketConduit)manager.transport).detach();
142
143 }
144
145 void setDataHandler(void delegate(char[], Dispatcher) h)
146 {
147 dataHandler = h;
148 }
149
150 private
151 Vat vat;
152 CircularSeq!(Dispatcher) children;
153 Dispatcher manager;
154 Logger log;
155 RawTCPHandler h;
156 void delegate(char[], Dispatcher) dataHandler;
157 }
158
159 class RawTCPClient
160 {
161
162 public
163 this(Dispatcher mgr, Vat sel, Event evts = Event.Read)
164 {
165 manager = mgr;
166 manager.events(evts);
167 connected = false;
168 mgr.setOutgoingHandler(&RawTCPHandler.onSend);
169 mgr.setIncomingHandler(&onReceive);
170 mgr.setErrorHandler(&RawTCPHandler.onError);
171 mgr.setDisconnectHandler(&RawTCPHandler.onHangup);
172 vat = sel;
173 log = Log.lookup("dreactor.protocol.RawTcpClient");
174 }
175
176 this(Vat sel, Event evts = Event.Read)
177 {
178 AsyncSocketConduit clcond = new AsyncSocketConduit;
179 Dispatcher ch = new Dispatcher(clcond);
180 this(ch, sel, evts);
181 }
182
183 ~this()
184 {
185 (cast(AsyncSocketConduit)manager.transport).shutdown();
186 (cast(AsyncSocketConduit)manager.transport).detach();
187 }
188
189 int connect(IPv4Address addr)
190 {
191 (cast(AsyncSocketConduit) manager.transport()).connect(addr);
192 vat.addConnection(manager);
193 connected = true;
194 log.info("connected to {}", addr);
195 return 0;
196 }
197
198 /**************************************************************************
199
200 send
201 User-called function to send data to the counterpart at the other
202 end of the connection. This sets up the connection manager to send
203 data as the socket becomes free.
204
205 **************************************************************************/
206 int send(char[] outbuf, IPv4Address addr = null)
207 {
208 if (!connected)
209 {
210 log.info("send: not connected, connecting");
211 if (addr !is null)
212 {
213 if (0 > connect(addr))
214 {
215 log.error("send: unable to connect");
216 return -1;
217 }
218 }
219 }
220 if (manager.appendOutBuffer(outbuf))
221 {
222 manager.addEvent(Event.Write);
223 if (!vat.addConnection(manager))
224 {
225 log.error("unable to register mgr");
226 }
227 }
228 return 0;
229 }
230
231 /**************************************************************************
232
233 receive
234 IncomingHandlerD
235 Default incoming data handler. Should be replaced with something useful.
236
237 **************************************************************************/
238 public int onReceive(Dispatcher h)
239 {
240 Logger log = Log.lookup("Handlers.onReceive");
241
242 char inbuf[8192];
243 int amt;
244 if((amt = h.transport.read(inbuf)) > 0)
245 {
246 if (dataHandler)
247 dataHandler(inbuf[0 .. amt], h);
248 else
249 log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
250 }
251 else
252 {
253 if (amt == 0)
254 {
255 return CLOSE;
256 }
257 log.error("Received no data, err = {}", amt);
258 }
259 return REMAIN;
260 }
261
262 void setDataHandler(void delegate(char[], Dispatcher) h)
263 {
264 dataHandler = h;
265 }
266 private
267 void delegate(char[], Dispatcher) dataHandler;
268 Dispatcher manager;
269 Vat vat;
270 bool connected;
271 Logger log;
272 RawTCPHandler h;
273 }
274
275
276 /******************************************************************************
277
278 Default Event handlers common to both listener/clients
279
280 ******************************************************************************/
281 struct RawTCPHandler
282 {
283 /**************************************************************************
284
285 onSend
286 OutgoingHandlerD
287 To be registered as the response to socket writable event.
288 Sends data, returns amount sent. Unregisters Handler for sending
289 if there is no more data left to send.
290
291 ***************************************************************************/
292 public static int onSend(Dispatcher h)
293 {
294 Logger log = Log.lookup("Handlers.onSend");
295
296 char[] outbuf = h.nextBuffer();
297 if (outbuf !is null)
298 {
299 int sent = h.transport.write(outbuf);
300 if (sent > 0)
301 {
302 if (! h.addOffset(sent))
303 {
304 h.remEvent(Event.Write);
305 return REREGISTER;
306 }
307 }
308 else if (sent == AsyncSocketConduit.Eof)
309 {
310 log.error("Select said socket was writable, but sent 0 bytes");
311 }
312 else
313 {
314 log.error("Socket send return ERR");
315 }
316 return REMAIN;
317 }
318 else
319 {
320 h.remEvent(Event.Write);
321 return REREGISTER;
322 }
323 }
324
325 static int onHangup(Dispatcher d)
326 {
327 return UNREGISTER;
328 }
329
330 static int onError(Dispatcher d, RegisterD unreg)
331 {
332 return CLOSE;
333 }
334
335 }
336
337 bool includes(Dispatcher[] haystack, Dispatcher needle)
338 {
339 foreach(Dispatcher h; haystack)
340 {
341 if (h is needle)
342 return true;
343 }
344 return false;
345 }