comparison dreactor/protocol/RawTcp.d @ 10:e75a2e506b1d

housekeeping
author rick@minifunk
date Fri, 01 Aug 2008 16:30:45 -0400
parents 5412a1ff2e49
children 5836613d16ac
comparison
equal deleted inserted replaced
9:5412a1ff2e49 10:e75a2e506b1d
14 /****************************************************************************** 14 /******************************************************************************
15 15
16 Basic TCP server or client routines for sending raw data. 16 Basic TCP server or client routines for sending raw data.
17 17
18 ******************************************************************************/ 18 ******************************************************************************/
19 class RawTCPListener : RawTCPHandler 19 class RawTCPListener
20 { 20 {
21 public 21 public
22 Logger log;
23 this(Dispatcher mgr, Vat sel, IPv4Address addr) 22 this(Dispatcher mgr, Vat sel, IPv4Address addr)
24 { 23 {
25 manager = mgr; 24 manager = mgr;
26 mgr.events(Event.Read); 25 mgr.events(Event.Read);
27 mgr.setOutgoingHandler(&onSend); 26 mgr.setOutgoingHandler(&RawTCPHandler.onSend);
28 mgr.setIncomingHandler(&onReceive); 27 mgr.setIncomingHandler(&onReceive);
29 mgr.setConnectHandler(&accept); 28 mgr.setConnectHandler(&accept);
30 mgr.setErrorHandler(&onError); 29 mgr.setErrorHandler(&RawTCPHandler.onError);
31 mgr.setDisconnectHandler(&onHangup); 30 mgr.setDisconnectHandler(&RawTCPHandler.onHangup);
32 mgr.listen(addr); 31 mgr.listen(addr);
33 32
34 sel.addConnection(mgr); 33 sel.addConnection(mgr);
35 vat = sel; 34 vat = sel;
36 log = Log.lookup("dreactor.protocol.RawTcpServer"); 35 log = Log.lookup("dreactor.protocol.RawTcpServer");
45 this(lh, sel, addr); 44 this(lh, sel, addr);
46 } 45 }
47 46
48 ~this() 47 ~this()
49 { 48 {
50 foreach(Dispatcher d; children) 49 close();
51 {
52 (cast(AsyncSocketConduit)d.transport).shutdown();
53 (cast(AsyncSocketConduit)d.transport).detach();
54 }
55 (cast(AsyncSocketConduit)manager.transport).shutdown();
56 (cast(AsyncSocketConduit)manager.transport).detach();
57 } 50 }
58 51
59 int accept(Conduit cond, RegisterD reg) 52 int accept(Conduit cond, RegisterD reg)
60 { 53 {
61 AsyncSocketConduit newcond = new AsyncSocketConduit; 54 AsyncSocketConduit newcond = new AsyncSocketConduit;
94 int send(Dispatcher d, char[] outbuf, IPv4Address addr = null) 87 int send(Dispatcher d, char[] outbuf, IPv4Address addr = null)
95 { 88 {
96 if (d.appendOutBuffer(outbuf)) 89 if (d.appendOutBuffer(outbuf))
97 { 90 {
98 d.addEvent(Event.Write); 91 d.addEvent(Event.Write);
99 d.setOutgoingHandler(&onSend);
100 if (!vat.addConnection(d)) 92 if (!vat.addConnection(d))
101 { 93 {
102 log.error("unable to register mgr"); 94 log.error("unable to register mgr");
103 } 95 }
104 } 96 }
105 return 0; 97 return 0;
106 } 98 }
107 99
108 public int onReceive(Dispatcher h) 100 /**************************************************************************
101
102 receive
103 IncomingHandlerD
104 Default incoming data handler. Should be replaced with something useful.
105
106 **************************************************************************/
107 int onReceive(Dispatcher h)
109 { 108 {
110 Logger log = Log.lookup("Handlers.onReceive"); 109 Logger log = Log.lookup("Handlers.onReceive");
111 110
112 char inbuf[8192]; 111 char inbuf[8192];
113 int amt; 112 int amt;
131 return REMAIN; 130 return REMAIN;
132 } 131 }
133 132
134 void close() 133 void close()
135 { 134 {
135 foreach(Dispatcher d; children)
136 {
137 (cast(AsyncSocketConduit)d.transport).shutdown();
138 (cast(AsyncSocketConduit)d.transport).detach();
139 }
140 (cast(AsyncSocketConduit)manager.transport).shutdown();
141 (cast(AsyncSocketConduit)manager.transport).detach();
136 142
137 } 143 }
144
145 void setDataHandler(void delegate(char[], Dispatcher) h)
146 {
147 dataHandler = h;
148 }
138 149
139 private 150 private
140 Dispatcher manager;
141 Vat vat; 151 Vat vat;
142 CircularSeq!(Dispatcher) children; 152 CircularSeq!(Dispatcher) children;
153 Dispatcher manager;
154 Logger log;
155 RawTCPHandler h;
156 void delegate(char[], Dispatcher) dataHandler;
143 } 157 }
144 158
145 class RawTCPClient : RawTCPHandler 159 class RawTCPClient
146 { 160 {
161
147 public 162 public
148 Logger log;
149 this(Dispatcher mgr, Vat sel, Event evts = Event.Read) 163 this(Dispatcher mgr, Vat sel, Event evts = Event.Read)
150 { 164 {
151 manager = mgr; 165 manager = mgr;
152 manager.events(evts); 166 manager.events(evts);
153 connected = false; 167 connected = false;
154 mgr.setOutgoingHandler(&onSend); 168 mgr.setOutgoingHandler(&RawTCPHandler.onSend);
155 mgr.setIncomingHandler(&onReceive); 169 mgr.setIncomingHandler(&onReceive);
156 mgr.setErrorHandler(&onError); 170 mgr.setErrorHandler(&RawTCPHandler.onError);
157 mgr.setDisconnectHandler(&onHangup); 171 mgr.setDisconnectHandler(&RawTCPHandler.onHangup);
158 vat = sel; 172 vat = sel;
159 log = Log.lookup("dreactor.protocol.RawTcpClient"); 173 log = Log.lookup("dreactor.protocol.RawTcpClient");
160 } 174 }
161 175
162 this(Vat sel, Event evts = Event.Read) 176 this(Vat sel, Event evts = Event.Read)
212 } 226 }
213 } 227 }
214 return 0; 228 return 0;
215 } 229 }
216 230
231 /**************************************************************************
232
233 receive
234 IncomingHandlerD
235 Default incoming data handler. Should be replaced with something useful.
236
237 **************************************************************************/
238 public int onReceive(Dispatcher h)
239 {
240 Logger log = Log.lookup("Handlers.onReceive");
241
242 char inbuf[8192];
243 int amt;
244 if((amt = h.transport.read(inbuf)) > 0)
245 {
246 if (dataHandler)
247 dataHandler(inbuf[0 .. amt], h);
248 else
249 log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
250 }
251 else
252 {
253 if (amt == 0)
254 {
255 return CLOSE;
256 }
257 log.error("Received no data, err = {}", amt);
258 }
259 return REMAIN;
260 }
261
262 void setDataHandler(void delegate(char[], Dispatcher) h)
263 {
264 dataHandler = h;
265 }
217 private 266 private
267 void delegate(char[], Dispatcher) dataHandler;
218 Dispatcher manager; 268 Dispatcher manager;
219 Vat vat; 269 Vat vat;
220 bool connected; 270 bool connected;
271 Logger log;
272 RawTCPHandler h;
221 } 273 }
222 274
223 275
224 /****************************************************************************** 276 /******************************************************************************
225 277
226 Default Event handlers common to both listener/clients 278 Default Event handlers common to both listener/clients
227 279
228 ******************************************************************************/ 280 ******************************************************************************/
229 class RawTCPHandler 281 struct RawTCPHandler
230 { 282 {
231 /************************************************************************** 283 /**************************************************************************
232 284
233 onSend 285 onSend
234 OutgoingHandlerD 286 OutgoingHandlerD
235 To be registered as the response to socket writable event. 287 To be registered as the response to socket writable event.
236 Sends data, returns amount sent. Unregisters Handler for sending 288 Sends data, returns amount sent. Unregisters Handler for sending
237 if there is no more data left to send. 289 if there is no more data left to send.
238 290
239 ***************************************************************************/ 291 ***************************************************************************/
240 public int onSend(Dispatcher h) 292 public static int onSend(Dispatcher h)
241 { 293 {
242 Logger log = Log.lookup("Handlers.onSend"); 294 Logger log = Log.lookup("Handlers.onSend");
243 295
244 char[] outbuf = h.nextBuffer(); 296 char[] outbuf = h.nextBuffer();
245 if (outbuf !is null) 297 if (outbuf !is null)
246 { 298 {
247 int sent = h.transport.write(outbuf); 299 int sent = h.transport.write(outbuf);
248 if (sent > 0) 300 if (sent > 0)
249 { 301 {
250 if (! h.addOffset(sent)) 302 if (! h.addOffset(sent))
251 { 303 {
252 h.remEvent(Event.Write); 304 h.remEvent(Event.Write);
253 return REREGISTER; 305 return REREGISTER;
254 } 306 }
255 } 307 }
256 else if (sent == AsyncSocketConduit.Eof) 308 else if (sent == AsyncSocketConduit.Eof)
257 { 309 {
258 log.error("Select said socket was writable, but sent 0 bytes"); 310 log.error("Select said socket was writable, but sent 0 bytes");
311 }
312 else
313 {
314 log.error("Socket send return ERR");
315 }
316 return REMAIN;
259 } 317 }
260 else 318 else
261 { 319 {
262 log.error("Socket send return ERR"); 320 h.remEvent(Event.Write);
263 } 321 return REREGISTER;
264 return REMAIN; 322 }
265 } 323 }
266 else 324
267 { 325 static int onHangup(Dispatcher d)
268 h.remEvent(Event.Write); 326 {
269 return REREGISTER; 327 return UNREGISTER;
270 } 328 }
271 } 329
272 /************************************************************************** 330 static int onError(Dispatcher d, RegisterD unreg)
273 331 {
274 receive 332 return CLOSE;
275 IncomingHandlerD 333 }
276 Default incoming data handler. Should be replaced with something useful. 334
277
278 **************************************************************************/
279 public int onReceive(Dispatcher h)
280 {
281 Logger log = Log.lookup("Handlers.onReceive");
282
283 char inbuf[8192];
284 int amt;
285 if((amt = h.transport.read(inbuf)) > 0)
286 {
287 if (dataHandler)
288 dataHandler(inbuf[0 .. amt], h);
289 else
290 log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
291 }
292 else
293 {
294 if (amt == 0)
295 {
296 return CLOSE;
297 }
298 log.error("Received no data, err = {}", amt);
299 }
300 return REMAIN;
301 }
302 int onHangup(Dispatcher d)
303 {
304 return UNREGISTER;
305 }
306 int onError(Dispatcher d, RegisterD unreg)
307 {
308 return CLOSE;
309 }
310
311 void setDataHandler(void delegate(char[],Dispatcher) del)
312 {
313 dataHandler = del;
314 }
315 protected
316 void delegate(char[], Dispatcher) dataHandler;
317 } 335 }
318 336
319 bool includes(Dispatcher[] haystack, Dispatcher needle) 337 bool includes(Dispatcher[] haystack, Dispatcher needle)
320 { 338 {
321 foreach(Dispatcher h; haystack) 339 foreach(Dispatcher h; haystack)