comparison dreactor/core/SelectLoop.d @ 3:e3dbc9208822

basic tests working
author rick@minifunk
date Tue, 08 Jul 2008 11:21:09 -0400
parents d3374d553986
children
comparison
equal deleted inserted replaced
2:d3374d553986 3:e3dbc9208822
10 10
11 *******************************************************************************/ 11 *******************************************************************************/
12 12
13 module dreactor.core.SelectLoop; 13 module dreactor.core.SelectLoop;
14 14
15 import dreactor.transport.AsyncSocketConduit;
16 import tango.io.selector.Selector; 15 import tango.io.selector.Selector;
17 import tango.io.selector.model.ISelector; 16 import tango.io.selector.model.ISelector;
18 import tango.core.Exception; 17 import tango.core.Exception;
19 import tango.core.Thread; 18 import tango.core.Thread;
20 import tango.core.Atomic; 19 import tango.core.Atomic;
21 import tango.util.collection.LinkSeq; 20 import tango.util.collection.LinkSeq;
22 import tango.util.log.Log; 21 import tango.util.log.Log;
23 import tango.util.log.Configurator;
24 22
25 import dreactor.core.ConnectionHandler; 23 import dreactor.transport.AsyncSocketConduit;
24 import dreactor.core.ConnectionHandler;
25 import dreactor.util.ThreadSafeQueue;
26 26
27 Logger log = Log.getLogger("dreactor.core.SelectLoop"); 27 Logger log;
28
29 enum : int {UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
28 30
29 static char[] version_string = "SelectLoop.d 0.1 2008-05-31"; 31 static char[] version_string = "SelectLoop.d 0.1 2008-05-31";
30 32
31 class SelectLoop 33 class SelectLoop
32 { 34 {
33 private 35 private
34 Thread thread; 36 Thread thread;
35 bool running; 37 bool running;
36 Atomic!(int) pending; 38 Atomic!(int) pending;
37 39
38 ThreadSafeQueue!(ConnectionHandler) freshList; 40 ThreadSafeQueue!(ConnectionHandler) freshList;
39 //LinkSeq!(ConnectionHandler) connList; probably not necessary 41 ThreadSafeQueue!(ConnectionHandler) remList;
40 42 public
41 public
42 this() 43 this()
43 { 44 {
44 freshList = new ThreadSafeQueue!(ConnectionHandler); 45 freshList = new ThreadSafeQueue!(ConnectionHandler);
45 //connList = new LinkSeq!(ConnectionHandler); 46 remList = new ThreadSafeQueue!(ConnectionHandler);
46 Configurator(); 47 log = Log.lookup("dreactor.core.SelectLoop");
47 } 48 }
48 49
49 void run() 50 void run()
50 { 51 {
51 running = true; 52 running = true;
58 running = false; 59 running = false;
59 } 60 }
60 61
61 bool addConnection(ConnectionHandler handler) 62 bool addConnection(ConnectionHandler handler)
62 { 63 {
64 log.trace("adding handler");
63 return freshList.push(handler); 65 return freshList.push(handler);
66 }
67
68 bool remConnection(ConnectionHandler handler)
69 {
70 return remList.push(handler);
64 } 71 }
65 72
66 private 73 private
67 void eventLoop() 74 void eventLoop()
68 { 75 {
69 auto selector = new Selector(); 76 auto selector = new Selector();
70 selector.open(); 77 selector.open();
71 auto format = Log.format;
72 do 78 do
73 { 79 {
74 auto eventCount = selector.select(10); 80 auto eventCount = selector.select(0.01);
75 81
76 if (eventCount > 0) 82 if (eventCount > 0)
77 { 83 {
78 // process events 84 // process events
79 foreach (SelectionKey key; selector.selectedSet()) 85 foreach (SelectionKey key; selector.selectedSet())
80 { 86 {
81 if (key.isReadable()) 87 if (key.isReadable())
82 { 88 {
83 // incoming data 89 // incoming data
90 log.trace("Read event fired");
84 auto conn = cast(ConnectionHandler) key.attachment; 91 auto conn = cast(ConnectionHandler) key.attachment;
85 if ( ConnectionHandler.listening == conn.getState() ) 92 if ( ConnectionHandler.State.listening == conn.getState() )
86 conn.handleConnection(conn.transport, addConnection); 93 conn.handleConnection(conn.transport, &addConnection);
87 else if ( ! conn.handleIncoming(conn.transport)) 94 else
88 unregister(conn.transport); 95 processReturn(conn.handleIncoming(), selector, conn);
89 } 96 }
90 else if (key.isWritable()) 97 else if (key.isWritable())
91 { 98 {
99 log.trace("Write event fired");
92 auto conn = cast(ConnectionHandler) key.attachment; 100 auto conn = cast(ConnectionHandler) key.attachment;
93 if ( ! conn.handleOutgoing(conn.transport)) 101 processReturn(conn.handleOutgoing(), selector, conn);
94 unregister(conn.transport);
95 } 102 }
96 else if (key.isHangup()) 103 else if (key.isHangup())
97 { 104 {
98 auto conn = cast(ConnectionHandler) key.attachment; 105 auto conn = cast(ConnectionHandler) key.attachment;
99 if ( ! conn.handleDisconnect(conn.transport)) 106 processReturn(conn.handleDisconnect(), selector, conn);
100 unregister(conn.transport);
101 } 107 }
102 else if (key.isError() || key.isInvalidHandle()) 108 else if (key.isError() || key.isInvalidHandle())
103 { 109 {
110 log.trace("Error event fired");
104 // error, close connection 111 // error, close connection
105 auto conn = cast(ConnectionHandler) key.attachment; 112 auto conn = cast(ConnectionHandler) key.attachment;
106 if ( ! conn.handleError(conn.transport, key.isInvalidHandle())) 113 conn.handleError(&remConnection);
107 selector.unregister(conn.transport);
108 } 114 }
109 } 115 }
110 } 116 }
111 else if (eventCount == 0) 117 else if (eventCount == 0)
112 { 118 {
113 /* can't think of anything useful to do here. */ 119 /* can't think of anything useful to do here. */
114 } 120 }
115 else 121 else
116 { 122 {
117 log.error(format("Selector.select returned {}", eventCount)); 123 log.error("Selector.select returned {}", eventCount);
118 } 124 }
119
120 //add Conduits to listener 125 //add Conduits to listener
121 freshList.processAll( (ConnectionHandler h) 126 freshList.processAll( (ref ConnectionHandler h)
122 { 127 {
123 selector.register(conn.transport, conn.events()); 128 log.trace("reregistering transport for event {}", h.events());
124 //connList.append(conn); 129 selector.reregister(h.transport, h.events(), h);
125 return 1; 130 return 1;
126 }); 131 });
127 //freshList.processAll(reg); 132 remList.processAll( (ref ConnectionHandler h)
133 {
134 selector.unregister(h.transport);
135 return 1;
136 });
128 137
129 } while (running) 138 } while (running)
139
140 log.trace("done with while loop");
141 }
142
143 void processReturn(int result, Selector s, ConnectionHandler h)
144 {
145 switch(result)
146 {
147 case UNREGISTER:
148 s.unregister(h.transport);
149 break;
150 case REMAIN:
151 //this space intentially left blank
152 break;
153 case REGISTER:
154 s.register(h.transport, h.events(), h);
155 break;
156 case REREGISTER:
157 s.reregister(h.transport, h.events(), h);
158 break;
159 default:
160 log.error("unknown return value");
161 }
130 } 162 }
131 } 163 }