4
|
1 module dreactor.protocol.RawTcp;
|
|
2
|
|
3 import tango.io.Conduit;
|
|
4 import tango.io.selector.model.ISelector;
|
|
5 import tango.net.Socket;
|
|
6 import tango.util.collection.CircularSeq;
|
|
7 import tango.util.log.Log;
|
|
8 import tango.util.log.Config;
|
|
9
|
|
10 import dreactor.transport.AsyncSocketConduit;
|
5
|
11 import dreactor.core.Vat;
|
4
|
12 import dreactor.core.ConnectionHandler;
|
|
13
|
|
14 /******************************************************************************
|
|
15
|
|
16 Basic TCP server or client routines for sending raw data.
|
|
17
|
|
18 ******************************************************************************/
|
|
19 class RawTCPListener
|
|
20 {
|
|
21 public
|
|
22 Logger log;
|
5
|
23 this(ConnectionHandler mgr, Vat sel, IPv4Address addr)
|
4
|
24 {
|
|
25 manager = mgr;
|
|
26 mgr.events(Event.Read);
|
|
27 mgr.setOutgoingHandler(&Handlers.onSend);
|
|
28 mgr.setIncomingHandler(&Handlers.onReceive);
|
|
29 mgr.setConnectHandler(&accept);
|
|
30 mgr.listen(addr);
|
|
31
|
|
32 sel.addConnection(mgr);
|
5
|
33 vat = sel;
|
4
|
34 log = Log.lookup("dreactor.protocol.RawTcpServer");
|
|
35 log.info("log initialized");
|
|
36 children = new CircularSeq!(ConnectionHandler);
|
|
37 }
|
|
38
|
|
39 int accept(Conduit cond, RegisterD reg)
|
|
40 {
|
|
41 AsyncSocketConduit newcond = new AsyncSocketConduit;
|
|
42 (cast(AsyncSocketConduit)cond).socket().accept(newcond.socket);
|
|
43 ConnectionHandler h = ConnectionHandler.New(newcond, manager);
|
|
44 h.events(Event.Read);
|
5
|
45 vat.addConnection(h);
|
4
|
46 children.append(h);
|
|
47 log.info("accepted new connection");
|
|
48 return 0;
|
|
49 }
|
|
50
|
|
51 int broadcast(char[] outbuf)
|
|
52 {
|
|
53 foreach(ConnectionHandler h; children)
|
|
54 {
|
|
55 if (h.appendOutBuffer(outbuf))
|
|
56 {
|
|
57 h.addEvent(Event.Write);
|
5
|
58 vat.addConnection(h);
|
4
|
59 }
|
|
60 }
|
|
61 return 0;
|
|
62 }
|
|
63
|
|
64 void close()
|
|
65 {
|
|
66
|
|
67 }
|
|
68
|
|
69 private
|
|
70 ConnectionHandler manager;
|
5
|
71 Vat vat;
|
4
|
72 CircularSeq!(ConnectionHandler) children;
|
|
73 }
|
|
74
|
|
75 class RawTCPClient
|
|
76 {
|
|
77 public
|
|
78 Logger log;
|
5
|
79 this(ConnectionHandler mgr, Vat sel, Event evts = Event.Read)
|
4
|
80 {
|
|
81 manager = mgr;
|
|
82 manager.events(evts);
|
|
83 connected = false;
|
|
84 mgr.setOutgoingHandler(&Handlers.onSend);
|
|
85 mgr.setIncomingHandler(&Handlers.onReceive);
|
5
|
86 vat = sel;
|
4
|
87 log = Log.lookup("dreactor.protocol.RawTcpClient");
|
|
88 }
|
|
89
|
|
90 int connect(IPv4Address addr)
|
|
91 {
|
|
92 (cast(AsyncSocketConduit) manager.transport()).connect(addr);
|
5
|
93 vat.addConnection(manager);
|
4
|
94 connected = true;
|
|
95 log.info("connected to {}", addr);
|
|
96 return 0;
|
|
97 }
|
|
98
|
|
99 /**************************************************************************
|
|
100
|
|
101 send
|
|
102 User-called function to send data to the counterpart at the other
|
|
103 end of the connection. This sets up the connection manager to send
|
|
104 data as the socket becomes free.
|
|
105
|
|
106 **************************************************************************/
|
|
107 int send(char[] outbuf, IPv4Address addr = null)
|
|
108 {
|
|
109 if (!connected)
|
|
110 {
|
5
|
111 log.info("send: not connected, connecting");
|
4
|
112 if (addr !is null)
|
|
113 {
|
|
114 if (0 > connect(addr))
|
|
115 {
|
5
|
116 log.error("send: unable to connect");
|
4
|
117 return -1;
|
|
118 }
|
|
119 }
|
|
120 }
|
|
121 if (manager.appendOutBuffer(outbuf))
|
|
122 {
|
|
123 manager.addEvent(Event.Write);
|
5
|
124 if (!vat.addConnection(manager))
|
4
|
125 {
|
|
126 log.error("unable to register mgr");
|
|
127 }
|
|
128 }
|
|
129 return 0;
|
|
130 }
|
|
131
|
|
132 private
|
|
133 ConnectionHandler manager;
|
5
|
134 Vat vat;
|
4
|
135 bool connected;
|
|
136 }
|
|
137
|
|
138
|
|
139 /******************************************************************************
|
|
140
|
|
141 Default Event handlers common to both listener/clients
|
|
142
|
|
143 ******************************************************************************/
|
|
144 class Handlers
|
|
145 {
|
|
146 /**************************************************************************
|
|
147
|
|
148 onSend
|
|
149 OutgoingHandlerD
|
|
150 To be registered as the response to socket writable event.
|
|
151 Sends data, returns amount sent. Unregisters Handler for sending
|
|
152 if there is no more data left to send.
|
|
153
|
|
154 ***************************************************************************/
|
|
155 public static int onSend(ConnectionHandler h)
|
|
156 {
|
|
157 Logger log = Log.lookup("Handlers.onSend");
|
|
158
|
|
159 log.info("top of onSend");
|
|
160 char[] outbuf = h.nextBuffer();
|
|
161 if (outbuf !is null)
|
|
162 {
|
|
163 int sent = h.transport.write(outbuf);
|
|
164 if (sent > 0)
|
|
165 {
|
|
166 if (! h.addOffset(sent))
|
|
167 {
|
|
168 h.remEvent(Event.Write);
|
|
169 return REREGISTER;
|
|
170 }
|
|
171 }
|
|
172 else if (sent == AsyncSocketConduit.Eof)
|
|
173 {
|
|
174 log.error("Select said socket was writable, but sent 0 bytes");
|
|
175 }
|
|
176 else
|
|
177 {
|
|
178 log.error("Socket send return ERR");
|
|
179 }
|
|
180 return REMAIN;
|
|
181 }
|
|
182 else
|
|
183 {
|
|
184 h.remEvent(Event.Write);
|
|
185 return REREGISTER;
|
|
186 }
|
|
187 }
|
|
188 /**************************************************************************
|
|
189
|
|
190 receive
|
|
191 IncomingHandlerD
|
|
192 Default incoming data handler. Should be replaced with something useful.
|
|
193
|
|
194 **************************************************************************/
|
|
195 public static int onReceive(ConnectionHandler h)
|
|
196 {
|
|
197 Logger log = Log.lookup("Handlers.onReceive");
|
|
198
|
|
199 char inbuf[8192];
|
|
200 int amt;
|
|
201 if((amt = h.transport.read(inbuf)) > 0)
|
5
|
202 log.info("Received {} byte Buffer: {}", amt, inbuf[0 .. amt]);
|
4
|
203 else
|
|
204 log.error("Received no data, err = {}", amt);
|
|
205
|
|
206 return REMAIN;
|
|
207 }
|
|
208 }
|