Mercurial > projects > ldc
diff tango/example/networking/selector.d @ 132:1700239cab2e trunk
[svn r136] MAJOR UNSTABLE UPDATE!!!
Initial commit after moving to Tango instead of Phobos.
Lots of bugfixes...
This build is not suitable for most things.
author | lindquist |
---|---|
date | Fri, 11 Jan 2008 17:57:40 +0100 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/tango/example/networking/selector.d Fri Jan 11 17:57:40 2008 +0100 @@ -0,0 +1,399 @@ +/******************************************************************************* + copyright: Copyright (c) 2006 Juan Jose Comellas. All rights reserved + license: BSD style: $(LICENSE) + author: Juan Jose Comellas <juanjo@comellas.com.ar> +*******************************************************************************/ + +private +{ + version (Posix) + { + import tango.io.selector.PollSelector; + } + else version (linux) + { + import tango.io.selector.EpollSelector; + import tango.sys.linux.linux; + } + + import tango.io.selector.model.ISelector; + import tango.io.selector.Selector; + import tango.io.selector.SelectSelector; + import tango.io.selector.SelectorException; + import tango.io.Conduit; + import tango.net.Socket; + import tango.net.SocketConduit; + import tango.net.ServerSocket; + import tango.time.Clock; + import tango.util.log.Log; + import tango.util.log.ConsoleAppender; + import tango.util.log.DateLayout; + import tango.text.convert.Sprint; + import tango.core.Exception; + import tango.core.Thread; + import tango.sys.Common; + import tango.stdc.errno; +} + + +const uint HANDLE_COUNT = 4; +const uint EVENT_COUNT = 4; +const uint LOOP_COUNT = 50000; +const char[] SERVER_ADDR = "127.0.0.1"; +const ushort SERVER_PORT = 4000; +const uint MAX_LENGTH = 16; + +int main(char[][] args) +{ + Logger log = Log.getLogger("selector"); + Sprint!(char) sprint = new Sprint!(char)(256); + + log.addAppender(new ConsoleAppender(new DateLayout())); + + ISelector selector; + + for (int i = 0; i < 1; i++) + { + // Testing the SelectSelector + log.info(sprint("Pass {0}: Testing the select-based selector", i + 1)); + selector = new SelectSelector(); + testSelector(selector); + } + + // Testing the PollSelector + version (Posix) + { + for (int i = 0; i < 1; i++) + { + log.info(sprint("Pass {0}: Testing the poll-based selector", i + 1)); + selector = new PollSelector(); + testSelector(selector); + } + } + + // Testing the EpollSelector + version (linux) + { + for (int i = 0; i < 1; i++) + { + log.info(sprint("Pass {0}: Testing the epoll-based selector", i + 1)); + selector = new EpollSelector(); + testSelector(selector); + } + } + + return 0; +} + + +/** + * Create a server socket and run the Selector on it. + */ +void testSelector(ISelector selector) +{ + Logger log = Log.getLogger("selector.server"); + Sprint!(char) sprint = new Sprint!(char)(512); + + uint connectCount = 0; + uint receiveCount = 0; + uint sendCount = 0; + uint failedConnectCount = 0; + uint failedReceiveCount = 0; + uint failedSendCount = 0; + uint closeCount = 0; + uint errorCount = 0; + Time start = Clock.now; + Thread clientThread; + + selector.open(HANDLE_COUNT, EVENT_COUNT); + + clientThread = new Thread(&clientThreadFunc); + clientThread.start(); + + try + { + TimeSpan timeout = TimeSpan.seconds(1); + InternetAddress addr = new InternetAddress(SERVER_ADDR, SERVER_PORT); + ServerSocket serverSocket = new ServerSocket(addr, 5); + SocketConduit clientSocket; + char[MAX_LENGTH] buffer; + int eventCount; + uint count; + int i = 0; + + debug (selector) + log.trace("Registering server socket to Selector"); + + selector.register(serverSocket, Event.Read); + + while (true) + { + debug (selector) + log.trace(sprint("[{0}] Waiting for events from Selector", i)); + + eventCount = selector.select(timeout); + + debug (selector) + log.trace(sprint("[{0}] {1} events received from Selector", i, eventCount)); + + if (eventCount > 0) + { + foreach (SelectionKey selectionKey; selector.selectedSet()) + { + debug (selector) + log.trace(sprint("[{0}] Event mask for socket {1} is 0x{2:x4}", + i, cast(int) selectionKey.conduit.fileHandle(), + cast(uint) selectionKey.events)); + + if (selectionKey.isReadable()) + { + if (selectionKey.conduit is serverSocket) + { + debug (selector) + log.trace(sprint("[{0}] New connection from client", i)); + + clientSocket = serverSocket.accept(); + if (clientSocket !is null) + { + selector.register(clientSocket, Event.Read); + connectCount++; + } + else + { + debug (selector) + log.trace(sprint("[{0}] New connection attempt failed", i)); + failedConnectCount++; + } + } + else + { + // Reading from a client socket + debug (selector) + log.trace(sprint("[{0}] Receiving message from client", i)); + + count = (cast(SocketConduit) selectionKey.conduit).read(buffer); + if (count != IConduit.Eof) + { + debug (selector) + log.trace(sprint("[{0}] Received {1} from client ({2} bytes)", + i, buffer[0..count], count)); + selector.reregister(selectionKey.conduit, Event.Write); + receiveCount++; + } + else + { + debug (selector) + log.trace(sprint("[{0}] Handle {1} was closed; removing it from Selector", + i, cast(int) selectionKey.conduit.fileHandle())); + selector.unregister(selectionKey.conduit); + (cast(SocketConduit) selectionKey.conduit).close(); + failedReceiveCount++; + continue; + } + } + } + + if (selectionKey.isWritable()) + { + debug (selector) + log.trace(sprint("[{0}] Sending PONG to client", i)); + + count = (cast(SocketConduit) selectionKey.conduit).write("PONG"); + if (count != IConduit.Eof) + { + debug (selector) + log.trace(sprint("[{0}] Sent PONG to client ({1} bytes)", i, count)); + + selector.reregister(selectionKey.conduit, Event.Read); + sendCount++; + } + else + { + debug (selector) + log.trace(sprint("[{0}] Handle {1} was closed; removing it from Selector", + i, selectionKey.conduit.fileHandle())); + selector.unregister(selectionKey.conduit); + (cast(SocketConduit) selectionKey.conduit).close(); + failedSendCount++; + continue; + } + } + + if (selectionKey.isError() || selectionKey.isHangup() || selectionKey.isInvalidHandle()) + { + char[] status; + + if (selectionKey.isHangup()) + { + closeCount++; + status = "Hangup"; + } + else + { + errorCount++; + if (selectionKey.isInvalidHandle()) + status = "Invalid request"; + else + status = "Error"; + } + + debug (selector) + { + log.trace(sprint("[{0}] {1} in handle {2} from Selector", + i, status, cast(int) selectionKey.conduit.fileHandle())); + + log.trace(sprint("[{0}] Unregistering handle {1} from Selector", + i, cast(int) selectionKey.conduit.fileHandle())); + } + selector.unregister(selectionKey.conduit); + (cast(Conduit) selectionKey.conduit).close(); + + if (selectionKey.conduit !is serverSocket) + { + continue; + } + else + { + break; + } + } + } + } + else + { + debug (selector) + log.trace(sprint("[{0}] No more pending events in Selector; aborting", i)); + break; + } + i++; + + // Thread.sleep(1.0); + /* + if (i % 100 == 0) + { + fullCollect(); + getStats(gc) + } + */ + } + + serverSocket.socket().detach; + } + catch (SelectorException e) + { + log.error(sprint("Selector exception caught:\n{0}", e.toString())); + } + catch (Exception e) + { + log.error(sprint("Exception caught:\n{0}", e.toString())); + } + + log.info(sprint("Success: connect={0}; recv={1}; send={2}; close={3}", + connectCount, receiveCount, sendCount, closeCount)); + log.info(sprint("Failure: connect={0}, recv={1}; send={2}; error={3}", + failedConnectCount, failedReceiveCount, failedSendCount, errorCount)); + + log.info(sprint("Total time: {0} ms", cast(uint) (Clock.now - start).millis)); + + clientThread.join(); + + selector.close; +} + + +/** + * Thread that creates a client socket and sends messages to the server socket. + */ +void clientThreadFunc() +{ + Logger log = Log.getLogger("selector.client"); + Sprint!(char) sprint = new Sprint!(char)(256); + SocketConduit socket = new SocketConduit(); + + Thread.sleep(0.010); // 10 milliseconds + + try + { + InternetAddress addr = new InternetAddress(SERVER_ADDR, SERVER_PORT); + char[MAX_LENGTH] buffer; + uint count; + int i; + + debug (selector) + log.trace(sprint("[{0}] Connecting to server", i)); + + socket.connect(addr); + + for (i = 1; i <= LOOP_COUNT; i++) + { + debug (selector) + log.trace(sprint("[{0}] Sending PING to server", i)); + + while (true) + { + try + { + count = socket.write("PING"); + break; + } + catch (SocketException e) + { + if (errno != EINTR) + throw e; + } + } + if (count != IConduit.Eof) + { + debug (selector) + { + log.trace(sprint("[{0}] Sent PING to server ({1} bytes)", i, count)); + + log.trace(sprint("[{0}] Receiving message from server", i)); + } + while (true) + { + try + { + count = socket.read(buffer); + break; + } + catch (SocketException e) + { + if (errno != EINTR) + throw e; + } + } + if (count != IConduit.Eof) + { + debug (selector) + log.trace(sprint("[{0}] Received {1} from server ({2} bytes)", + i, buffer[0..count], count)); + } + else + { + debug (selector) + log.trace(sprint("[{0}] Handle was closed; aborting", + i, socket.fileHandle())); + break; + } + } + else + { + debug (selector) + log.trace(sprint("[{0}] Handle {1} was closed; aborting", + i, socket.fileHandle())); + break; + } + } + socket.shutdown(); + socket.close(); + } + catch (Exception e) + { + log.error(sprint("Exception caught:\n{0}", e.toString())); + } + debug (selector) + log.trace("Leaving thread"); + + return 0; +}