0
|
1 /*******************************************************************************
|
|
2
|
|
3 copyright: Copyright (c) 2008 Rick Richardson. All rights reserved
|
|
4
|
|
5 license: BSD style: $(LICENSE)
|
|
6
|
|
7 version: Initial release v0.1 : May 2008
|
|
8
|
|
9 author: Rick Richardson
|
|
10
|
|
11 *******************************************************************************/
|
|
12
|
5
|
13 module dreactor.core.Vat;
|
0
|
14
|
|
15 import tango.io.selector.Selector;
|
|
16 import tango.io.selector.model.ISelector;
|
|
17 import tango.core.Exception;
|
|
18 import tango.core.Thread;
|
|
19 import tango.core.Atomic;
|
|
20 import tango.util.collection.LinkSeq;
|
|
21 import tango.util.log.Log;
|
|
22
|
3
|
23 import dreactor.transport.AsyncSocketConduit;
|
6
|
24 import dreactor.core.Dispatcher;
|
3
|
25 import dreactor.util.ThreadSafeQueue;
|
0
|
26
|
3
|
27 Logger log;
|
|
28
|
8
|
29 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
|
0
|
30
|
5
|
31 static char[] version_string = "Vat.d 0.1 2008-05-31";
|
0
|
32
|
5
|
33 class Vat
|
0
|
34 {
|
|
35 private
|
|
36 Thread thread;
|
|
37 bool running;
|
|
38 Atomic!(int) pending;
|
3
|
39
|
6
|
40 ThreadSafeQueue!(Dispatcher) freshList;
|
|
41 ThreadSafeQueue!(Dispatcher) remList;
|
3
|
42 public
|
0
|
43 this()
|
|
44 {
|
6
|
45 freshList = new ThreadSafeQueue!(Dispatcher);
|
|
46 remList = new ThreadSafeQueue!(Dispatcher);
|
5
|
47 log = Log.lookup("dreactor.core.Vat");
|
0
|
48 }
|
|
49
|
|
50 void run()
|
|
51 {
|
|
52 running = true;
|
|
53 thread = new Thread(&eventLoop);
|
|
54 thread.start();
|
|
55 }
|
|
56
|
|
57 void exit()
|
|
58 {
|
|
59 running = false;
|
|
60 }
|
|
61
|
9
|
62 void wait()
|
|
63 {
|
|
64 thread.join();
|
|
65 }
|
|
66
|
6
|
67 bool addConnection(Dispatcher handler)
|
0
|
68 {
|
3
|
69 log.trace("adding handler");
|
0
|
70 return freshList.push(handler);
|
|
71 }
|
3
|
72
|
6
|
73 bool remConnection(Dispatcher handler)
|
3
|
74 {
|
|
75 return remList.push(handler);
|
|
76 }
|
0
|
77
|
|
78 private
|
|
79 void eventLoop()
|
|
80 {
|
|
81 auto selector = new Selector();
|
|
82 selector.open();
|
|
83 do
|
|
84 {
|
3
|
85 auto eventCount = selector.select(0.01);
|
0
|
86
|
|
87 if (eventCount > 0)
|
|
88 {
|
|
89 // process events
|
|
90 foreach (SelectionKey key; selector.selectedSet())
|
|
91 {
|
|
92 if (key.isReadable())
|
|
93 {
|
|
94 // incoming data
|
3
|
95 log.trace("Read event fired");
|
6
|
96 auto conn = cast(Dispatcher) key.attachment;
|
|
97 if ( Dispatcher.State.listening == conn.getState() )
|
3
|
98 conn.handleConnection(conn.transport, &addConnection);
|
|
99 else
|
|
100 processReturn(conn.handleIncoming(), selector, conn);
|
0
|
101 }
|
|
102 else if (key.isWritable())
|
|
103 {
|
3
|
104 log.trace("Write event fired");
|
6
|
105 auto conn = cast(Dispatcher) key.attachment;
|
3
|
106 processReturn(conn.handleOutgoing(), selector, conn);
|
0
|
107 }
|
|
108 else if (key.isHangup())
|
|
109 {
|
5
|
110 log.trace("Hangup event fired");
|
6
|
111 auto conn = cast(Dispatcher) key.attachment;
|
3
|
112 processReturn(conn.handleDisconnect(), selector, conn);
|
0
|
113 }
|
|
114 else if (key.isError() || key.isInvalidHandle())
|
|
115 {
|
3
|
116 log.trace("Error event fired");
|
0
|
117 // error, close connection
|
6
|
118 auto conn = cast(Dispatcher) key.attachment;
|
3
|
119 conn.handleError(&remConnection);
|
0
|
120 }
|
|
121 }
|
|
122 }
|
|
123 else if (eventCount == 0)
|
|
124 {
|
|
125 /* can't think of anything useful to do here. */
|
|
126 }
|
|
127 else
|
|
128 {
|
3
|
129 log.error("Selector.select returned {}", eventCount);
|
0
|
130 }
|
|
131 //add Conduits to listener
|
6
|
132 freshList.processAll( (ref Dispatcher h)
|
0
|
133 {
|
9
|
134 selector.register(h.transport, h.events(), h);
|
0
|
135 return 1;
|
|
136 });
|
6
|
137 remList.processAll( (ref Dispatcher h)
|
3
|
138 {
|
|
139 selector.unregister(h.transport);
|
|
140 return 1;
|
|
141 });
|
0
|
142
|
|
143 } while (running)
|
3
|
144
|
|
145 }
|
|
146
|
6
|
147 void processReturn(int result, Selector s, Dispatcher h)
|
3
|
148 {
|
|
149 switch(result)
|
|
150 {
|
8
|
151 case CLOSE:
|
|
152 s.unregister(h.transport);
|
9
|
153 h.transport.detach();
|
8
|
154 break;
|
3
|
155 case UNREGISTER:
|
|
156 s.unregister(h.transport);
|
|
157 break;
|
|
158 case REMAIN:
|
|
159 //this space intentially left blank
|
|
160 break;
|
|
161 case REGISTER:
|
|
162 s.register(h.transport, h.events(), h);
|
|
163 break;
|
|
164 case REREGISTER:
|
9
|
165 s.register(h.transport, h.events(), h);
|
3
|
166 break;
|
|
167 default:
|
5
|
168 log.error("processReturn: unknown return value");
|
3
|
169 }
|
0
|
170 }
|
|
171 }
|