comparison asyncdreactor/core/AsyncVat.d @ 11:5836613d16ac

reorg! reorg!
author rick@minifunk
date Tue, 12 Aug 2008 16:59:56 -0400
parents dreactor/core/AsyncVat.d@e75a2e506b1d
children
comparison
equal deleted inserted replaced
10:e75a2e506b1d 11:5836613d16ac
1 /*******************************************************************************
2
3 copyright: Copyright (c) 2008 Rick Richardson. All rights reserved
4
5 license: BSD style: $(LICENSE)
6
7 version: Initial release v0.1 : May 2008
8
9 author: Rick Richardson
10
11 *******************************************************************************/
12
13 module dreactor.core.Vat;
14
15 import tango.io.selector.Selector;
16 import tango.io.selector.model.ISelector;
17 import tango.core.Exception;
18 import tango.core.Thread;
19 import tango.core.Atomic;
20 import tango.util.collection.LinkSeq;
21 import tango.util.log.Log;
22
23 import dreactor.transport.AsyncSocketConduit;
24 import dreactor.core.Dispatcher;
25 import dreactor.util.ThreadSafeQueue;
26
27 Logger log;
28
29 enum : int {CLOSE = -2, UNREGISTER = -1, REMAIN = 0, REGISTER = 1, REREGISTER = 2};
30
31 static char[] version_string = "Vat.d 0.1 2008-05-31";
32
33 class Vat
34 {
35 private
36 Thread thread;
37 bool running;
38 Atomic!(int) pending;
39
40 ThreadSafeQueue!(Dispatcher) freshList;
41 ThreadSafeQueue!(Dispatcher) remList;
42 public
43 this()
44 {
45 freshList = new ThreadSafeQueue!(Dispatcher);
46 remList = new ThreadSafeQueue!(Dispatcher);
47 log = Log.lookup("dreactor.core.Vat");
48 }
49
50 void run()
51 {
52 running = true;
53 thread = new Thread(&eventLoop);
54 thread.start();
55 }
56
57 void exit()
58 {
59 running = false;
60 }
61
62 void wait()
63 {
64 thread.join();
65 }
66
67 bool addConnection(Dispatcher handler)
68 {
69 log.trace("adding handler");
70 return freshList.push(handler);
71 }
72
73 bool remConnection(Dispatcher handler)
74 {
75 return remList.push(handler);
76 }
77
78 private
79 void eventLoop()
80 {
81 auto selector = new Selector();
82 selector.open();
83 do
84 {
85 auto eventCount = selector.select(0.01);
86
87 if (eventCount > 0)
88 {
89 // process events
90 foreach (SelectionKey key; selector.selectedSet())
91 {
92 if (key.isReadable())
93 {
94 // incoming data
95 log.trace("Read event fired");
96 auto conn = cast(Dispatcher) key.attachment;
97 if ( Dispatcher.State.listening == conn.getState() )
98 conn.handleConnection(conn.transport, &addConnection);
99 else
100 processReturn(conn.handleIncoming(), selector, conn);
101 }
102 else if (key.isWritable())
103 {
104 log.trace("Write event fired");
105 auto conn = cast(Dispatcher) key.attachment;
106 processReturn(conn.handleOutgoing(), selector, conn);
107 }
108 else if (key.isHangup())
109 {
110 log.trace("Hangup event fired");
111 auto conn = cast(Dispatcher) key.attachment;
112 processReturn(conn.handleDisconnect(), selector, conn);
113 }
114 else if (key.isError() || key.isInvalidHandle())
115 {
116 log.trace("Error event fired");
117 // error, close connection
118 auto conn = cast(Dispatcher) key.attachment;
119 conn.handleError(&remConnection);
120 }
121 }
122 }
123 else if (eventCount == 0)
124 {
125 /* can't think of anything useful to do here. */
126 }
127 else
128 {
129 log.error("Selector.select returned {}", eventCount);
130 }
131 //add Conduits to listener
132 freshList.processAll( (ref Dispatcher h)
133 {
134 selector.register(h.transport, h.events(), h);
135 return 1;
136 });
137 remList.processAll( (ref Dispatcher h)
138 {
139 selector.unregister(h.transport);
140 return 1;
141 });
142
143 } while (running)
144
145 }
146
147 void processReturn(int result, Selector s, Dispatcher h)
148 {
149 switch(result)
150 {
151 case CLOSE:
152 s.unregister(h.transport);
153 h.transport.detach();
154 break;
155 case UNREGISTER:
156 s.unregister(h.transport);
157 break;
158 case REMAIN:
159 //this space intentially left blank
160 break;
161 case REGISTER:
162 s.register(h.transport, h.events(), h);
163 break;
164 case REREGISTER:
165 s.register(h.transport, h.events(), h);
166 break;
167 default:
168 log.error("processReturn: unknown return value");
169 }
170 }
171 }