comparison tango/example/networking/selector.d @ 132:1700239cab2e trunk

[svn r136] MAJOR UNSTABLE UPDATE!!! Initial commit after moving to Tango instead of Phobos. Lots of bugfixes... This build is not suitable for most things.
author lindquist
date Fri, 11 Jan 2008 17:57:40 +0100
parents
children
comparison
equal deleted inserted replaced
131:5825d48b27d1 132:1700239cab2e
1 /*******************************************************************************
2 copyright: Copyright (c) 2006 Juan Jose Comellas. All rights reserved
3 license: BSD style: $(LICENSE)
4 author: Juan Jose Comellas <juanjo@comellas.com.ar>
5 *******************************************************************************/
6
7 private
8 {
9 version (Posix)
10 {
11 import tango.io.selector.PollSelector;
12 }
13 else version (linux)
14 {
15 import tango.io.selector.EpollSelector;
16 import tango.sys.linux.linux;
17 }
18
19 import tango.io.selector.model.ISelector;
20 import tango.io.selector.Selector;
21 import tango.io.selector.SelectSelector;
22 import tango.io.selector.SelectorException;
23 import tango.io.Conduit;
24 import tango.net.Socket;
25 import tango.net.SocketConduit;
26 import tango.net.ServerSocket;
27 import tango.time.Clock;
28 import tango.util.log.Log;
29 import tango.util.log.ConsoleAppender;
30 import tango.util.log.DateLayout;
31 import tango.text.convert.Sprint;
32 import tango.core.Exception;
33 import tango.core.Thread;
34 import tango.sys.Common;
35 import tango.stdc.errno;
36 }
37
38
39 const uint HANDLE_COUNT = 4;
40 const uint EVENT_COUNT = 4;
41 const uint LOOP_COUNT = 50000;
42 const char[] SERVER_ADDR = "127.0.0.1";
43 const ushort SERVER_PORT = 4000;
44 const uint MAX_LENGTH = 16;
45
46 int main(char[][] args)
47 {
48 Logger log = Log.getLogger("selector");
49 Sprint!(char) sprint = new Sprint!(char)(256);
50
51 log.addAppender(new ConsoleAppender(new DateLayout()));
52
53 ISelector selector;
54
55 for (int i = 0; i < 1; i++)
56 {
57 // Testing the SelectSelector
58 log.info(sprint("Pass {0}: Testing the select-based selector", i + 1));
59 selector = new SelectSelector();
60 testSelector(selector);
61 }
62
63 // Testing the PollSelector
64 version (Posix)
65 {
66 for (int i = 0; i < 1; i++)
67 {
68 log.info(sprint("Pass {0}: Testing the poll-based selector", i + 1));
69 selector = new PollSelector();
70 testSelector(selector);
71 }
72 }
73
74 // Testing the EpollSelector
75 version (linux)
76 {
77 for (int i = 0; i < 1; i++)
78 {
79 log.info(sprint("Pass {0}: Testing the epoll-based selector", i + 1));
80 selector = new EpollSelector();
81 testSelector(selector);
82 }
83 }
84
85 return 0;
86 }
87
88
89 /**
90 * Create a server socket and run the Selector on it.
91 */
92 void testSelector(ISelector selector)
93 {
94 Logger log = Log.getLogger("selector.server");
95 Sprint!(char) sprint = new Sprint!(char)(512);
96
97 uint connectCount = 0;
98 uint receiveCount = 0;
99 uint sendCount = 0;
100 uint failedConnectCount = 0;
101 uint failedReceiveCount = 0;
102 uint failedSendCount = 0;
103 uint closeCount = 0;
104 uint errorCount = 0;
105 Time start = Clock.now;
106 Thread clientThread;
107
108 selector.open(HANDLE_COUNT, EVENT_COUNT);
109
110 clientThread = new Thread(&clientThreadFunc);
111 clientThread.start();
112
113 try
114 {
115 TimeSpan timeout = TimeSpan.seconds(1);
116 InternetAddress addr = new InternetAddress(SERVER_ADDR, SERVER_PORT);
117 ServerSocket serverSocket = new ServerSocket(addr, 5);
118 SocketConduit clientSocket;
119 char[MAX_LENGTH] buffer;
120 int eventCount;
121 uint count;
122 int i = 0;
123
124 debug (selector)
125 log.trace("Registering server socket to Selector");
126
127 selector.register(serverSocket, Event.Read);
128
129 while (true)
130 {
131 debug (selector)
132 log.trace(sprint("[{0}] Waiting for events from Selector", i));
133
134 eventCount = selector.select(timeout);
135
136 debug (selector)
137 log.trace(sprint("[{0}] {1} events received from Selector", i, eventCount));
138
139 if (eventCount > 0)
140 {
141 foreach (SelectionKey selectionKey; selector.selectedSet())
142 {
143 debug (selector)
144 log.trace(sprint("[{0}] Event mask for socket {1} is 0x{2:x4}",
145 i, cast(int) selectionKey.conduit.fileHandle(),
146 cast(uint) selectionKey.events));
147
148 if (selectionKey.isReadable())
149 {
150 if (selectionKey.conduit is serverSocket)
151 {
152 debug (selector)
153 log.trace(sprint("[{0}] New connection from client", i));
154
155 clientSocket = serverSocket.accept();
156 if (clientSocket !is null)
157 {
158 selector.register(clientSocket, Event.Read);
159 connectCount++;
160 }
161 else
162 {
163 debug (selector)
164 log.trace(sprint("[{0}] New connection attempt failed", i));
165 failedConnectCount++;
166 }
167 }
168 else
169 {
170 // Reading from a client socket
171 debug (selector)
172 log.trace(sprint("[{0}] Receiving message from client", i));
173
174 count = (cast(SocketConduit) selectionKey.conduit).read(buffer);
175 if (count != IConduit.Eof)
176 {
177 debug (selector)
178 log.trace(sprint("[{0}] Received {1} from client ({2} bytes)",
179 i, buffer[0..count], count));
180 selector.reregister(selectionKey.conduit, Event.Write);
181 receiveCount++;
182 }
183 else
184 {
185 debug (selector)
186 log.trace(sprint("[{0}] Handle {1} was closed; removing it from Selector",
187 i, cast(int) selectionKey.conduit.fileHandle()));
188 selector.unregister(selectionKey.conduit);
189 (cast(SocketConduit) selectionKey.conduit).close();
190 failedReceiveCount++;
191 continue;
192 }
193 }
194 }
195
196 if (selectionKey.isWritable())
197 {
198 debug (selector)
199 log.trace(sprint("[{0}] Sending PONG to client", i));
200
201 count = (cast(SocketConduit) selectionKey.conduit).write("PONG");
202 if (count != IConduit.Eof)
203 {
204 debug (selector)
205 log.trace(sprint("[{0}] Sent PONG to client ({1} bytes)", i, count));
206
207 selector.reregister(selectionKey.conduit, Event.Read);
208 sendCount++;
209 }
210 else
211 {
212 debug (selector)
213 log.trace(sprint("[{0}] Handle {1} was closed; removing it from Selector",
214 i, selectionKey.conduit.fileHandle()));
215 selector.unregister(selectionKey.conduit);
216 (cast(SocketConduit) selectionKey.conduit).close();
217 failedSendCount++;
218 continue;
219 }
220 }
221
222 if (selectionKey.isError() || selectionKey.isHangup() || selectionKey.isInvalidHandle())
223 {
224 char[] status;
225
226 if (selectionKey.isHangup())
227 {
228 closeCount++;
229 status = "Hangup";
230 }
231 else
232 {
233 errorCount++;
234 if (selectionKey.isInvalidHandle())
235 status = "Invalid request";
236 else
237 status = "Error";
238 }
239
240 debug (selector)
241 {
242 log.trace(sprint("[{0}] {1} in handle {2} from Selector",
243 i, status, cast(int) selectionKey.conduit.fileHandle()));
244
245 log.trace(sprint("[{0}] Unregistering handle {1} from Selector",
246 i, cast(int) selectionKey.conduit.fileHandle()));
247 }
248 selector.unregister(selectionKey.conduit);
249 (cast(Conduit) selectionKey.conduit).close();
250
251 if (selectionKey.conduit !is serverSocket)
252 {
253 continue;
254 }
255 else
256 {
257 break;
258 }
259 }
260 }
261 }
262 else
263 {
264 debug (selector)
265 log.trace(sprint("[{0}] No more pending events in Selector; aborting", i));
266 break;
267 }
268 i++;
269
270 // Thread.sleep(1.0);
271 /*
272 if (i % 100 == 0)
273 {
274 fullCollect();
275 getStats(gc)
276 }
277 */
278 }
279
280 serverSocket.socket().detach;
281 }
282 catch (SelectorException e)
283 {
284 log.error(sprint("Selector exception caught:\n{0}", e.toString()));
285 }
286 catch (Exception e)
287 {
288 log.error(sprint("Exception caught:\n{0}", e.toString()));
289 }
290
291 log.info(sprint("Success: connect={0}; recv={1}; send={2}; close={3}",
292 connectCount, receiveCount, sendCount, closeCount));
293 log.info(sprint("Failure: connect={0}, recv={1}; send={2}; error={3}",
294 failedConnectCount, failedReceiveCount, failedSendCount, errorCount));
295
296 log.info(sprint("Total time: {0} ms", cast(uint) (Clock.now - start).millis));
297
298 clientThread.join();
299
300 selector.close;
301 }
302
303
304 /**
305 * Thread that creates a client socket and sends messages to the server socket.
306 */
307 void clientThreadFunc()
308 {
309 Logger log = Log.getLogger("selector.client");
310 Sprint!(char) sprint = new Sprint!(char)(256);
311 SocketConduit socket = new SocketConduit();
312
313 Thread.sleep(0.010); // 10 milliseconds
314
315 try
316 {
317 InternetAddress addr = new InternetAddress(SERVER_ADDR, SERVER_PORT);
318 char[MAX_LENGTH] buffer;
319 uint count;
320 int i;
321
322 debug (selector)
323 log.trace(sprint("[{0}] Connecting to server", i));
324
325 socket.connect(addr);
326
327 for (i = 1; i <= LOOP_COUNT; i++)
328 {
329 debug (selector)
330 log.trace(sprint("[{0}] Sending PING to server", i));
331
332 while (true)
333 {
334 try
335 {
336 count = socket.write("PING");
337 break;
338 }
339 catch (SocketException e)
340 {
341 if (errno != EINTR)
342 throw e;
343 }
344 }
345 if (count != IConduit.Eof)
346 {
347 debug (selector)
348 {
349 log.trace(sprint("[{0}] Sent PING to server ({1} bytes)", i, count));
350
351 log.trace(sprint("[{0}] Receiving message from server", i));
352 }
353 while (true)
354 {
355 try
356 {
357 count = socket.read(buffer);
358 break;
359 }
360 catch (SocketException e)
361 {
362 if (errno != EINTR)
363 throw e;
364 }
365 }
366 if (count != IConduit.Eof)
367 {
368 debug (selector)
369 log.trace(sprint("[{0}] Received {1} from server ({2} bytes)",
370 i, buffer[0..count], count));
371 }
372 else
373 {
374 debug (selector)
375 log.trace(sprint("[{0}] Handle was closed; aborting",
376 i, socket.fileHandle()));
377 break;
378 }
379 }
380 else
381 {
382 debug (selector)
383 log.trace(sprint("[{0}] Handle {1} was closed; aborting",
384 i, socket.fileHandle()));
385 break;
386 }
387 }
388 socket.shutdown();
389 socket.close();
390 }
391 catch (Exception e)
392 {
393 log.error(sprint("Exception caught:\n{0}", e.toString()));
394 }
395 debug (selector)
396 log.trace("Leaving thread");
397
398 return 0;
399 }