annotate tango/tango/net/cluster/tina/Cluster.d @ 132:1700239cab2e trunk

[svn r136] MAJOR UNSTABLE UPDATE!!! Initial commit after moving to Tango instead of Phobos. Lots of bugfixes... This build is not suitable for most things.
author lindquist
date Fri, 11 Jan 2008 17:57:40 +0100
parents
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
132
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
2
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
4
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
5 license: BSD style: $(LICENSE)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
6
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
7 version: July 2004: Initial release
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
8
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
9 author: Kris
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
10
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
11 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
12
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
13 module tango.net.cluster.tina.Cluster;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
14
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
15 private import tango.math.Random;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
16
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
17 private import tango.core.Thread,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
18 tango.core.Runtime,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
19 tango.core.Exception;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
20
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
21 private import tango.util.log.Log,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
22 tango.util.log.Logger;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
23
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
24 private import tango.time.Clock;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
25
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
26 private import tango.io.Buffer,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
27 tango.io.GrowBuffer;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
28
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
29 private import tango.io.model.IConduit;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
30
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
31 private import tango.net.Socket,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
32 tango.net.SocketConduit,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
33 tango.net.SocketListener,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
34 tango.net.InternetAddress,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
35 tango.net.MulticastConduit;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
36
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
37 private import tango.net.cluster.NetworkClient;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
38
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
39 public import tango.net.cluster.model.ICluster;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
40
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
41 private import tango.net.cluster.tina.RollCall,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
42 tango.net.cluster.tina.ProtocolReader,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
43 tango.net.cluster.tina.ProtocolWriter;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
44
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
45 private import Integer = tango.text.convert.Integer;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
46
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
47 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
48
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
49 QOS implementation for sockets. All cluster-client activity is
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
50 gated through here by the higher level classes; NetworkQueue &
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
51 NetworkCache for example. You gain access to the cluster by
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
52 creating an instance of the QOS (quality of service) you desire
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
53 and either mapping client classes onto it, or usign it directly.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
54 For example:
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
55 ---
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
56 import tango.net.cluster.tina.Cluster;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
57
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
58 auto cluster = new Cluster;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
59 cluster.join;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
60
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
61 auto channel = cluster.createChannel (...);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
62 channel.putQueue (...);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
63 channel.getQueue ();
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
64 ---
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
65
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
66 Please see the cluster clients for additional details. Currently
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
67 these include CacheInvalidator, CacheInvalidatee, NetworkMessage,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
68 NetworkTask, NetworkQueue, NetworkCache, NetworkCombo, plus the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
69 Client base-class.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
70
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
71 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
72
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
73 class Cluster : Broadcaster, ICluster
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
74 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
75 private FlexNodeSet task,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
76 queue;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
77 private FixedNodeSet cache;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
78 private Logger logger;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
79
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
80 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
81
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
82 Create a cluster instance with a default logger and Nagle
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
83 caching disabled
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
84
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
85 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
86
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
87 this ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
88 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
89 this (Log.getLogger ("cluster.generic"), true);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
90 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
91
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
92 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
93
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
94 Create a cluster instance with the provided logger. Option
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
95 noDelay controls the settting of the Nagle algorithm on an
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
96 active connection to a server, which should be disabled by
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
97 default (noDelay == true)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
98
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
99 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
100
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
101 this (Logger log, bool noDelay = true)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
102 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
103 assert (log);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
104 logger = log;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
105
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
106 task = new FlexNodeSet (log, noDelay);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
107 queue = new FlexNodeSet (log, noDelay);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
108 cache = new FixedNodeSet (log, noDelay);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
109 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
110
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
111 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
112
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
113 Join the cluster as a client, discovering servers. Client
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
114 applications should invoke this before making requests so
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
115 that there are some servers to address.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
116
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
117 If cache facilities will be used, then the join(cacheHosts)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
118 variation should be used instead
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
119
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
120 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
121
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
122 final Cluster join ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
123 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
124 // listen for cluster servers
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
125 auto channel = createChannel ("cluster.server.advertise");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
126 channel.createBulletinConsumer (&notify);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
127
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
128 // ask who's currently running
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
129 channel.broadcast (new RollCall);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
130 logger.trace ("discovering cluster nodes");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
131
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
132 // wait for enabled servers to respond ...
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
133 Thread.sleep (0.250);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
134 return this;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
135 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
136
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
137 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
138
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
139 Join the cluster as a client, discovering servers. Client
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
140 applications should invoke this before making requests so
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
141 that there are some servers to address.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
142
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
143 If cache facilities will be used, use this method to set
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
144 the group of valid cache hosts. Each cache host should be
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
145 described as an array of machine-name and port pairs e.g.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
146 ---
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
147 ["lucy:1234", "daisy:3343", "daisy:3344"]
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
148 ---
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
149
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
150 This sets up a fixed set of cache hosts, which should be
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
151 identical for all cache clients. Cache hosts not included
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
152 in this list will be ignored when they come online.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
153
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
154 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
155
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
156 final Cluster join (char[][] cacheHosts)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
157 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
158 foreach (addr; cacheHosts)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
159 cache.addNode (new Node (log, addr, "cache"));
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
160 return join;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
161 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
162
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
163 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
164
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
165 Return the logger instance provided during construction.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
166
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
167 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
168
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
169 final Logger log ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
170 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
171 return logger;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
172 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
173
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
174 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
175
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
176 Create a channel instance. Our channel implementation
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
177 includes a number of cached IO helpers (ProtocolWriter
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
178 and so on) which simplifies and speeds up execution.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
179
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
180 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
181
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
182 final IChannel createChannel (char[] channel)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
183 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
184 return new Channel (this, channel);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
185 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
186
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
187 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
188
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
189 ChannelListener method for listening to RollCall responses.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
190 These are sent out by cluster servers both when they get a
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
191 RollCall request, and when they heartbeat.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
192
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
193 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
194
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
195 private void notify (IEvent event)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
196 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
197 scope rollcall = new RollCall;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
198 event.get (rollcall);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
199
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
200 switch (rollcall.type)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
201 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
202 default:
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
203 break;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
204
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
205 case RollCall.Task:
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
206 task.enable (rollcall.addr, "task");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
207 break;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
208
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
209 case RollCall.Cache:
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
210 cache.enable (rollcall.addr);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
211 break;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
212
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
213 case RollCall.Queue:
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
214 queue.enable (rollcall.addr, "queue");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
215 break;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
216 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
217 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
218 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
219
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
220
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
221 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
222
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
223 Basic multicast support across the cluster. Multicast is used
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
224 for broadcasting messages to all nodes in the cluster. We use
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
225 it for cache-invalidation, heartbeat, rollcall and notification
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
226 of queue activity
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
227
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
228 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
229
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
230 private class Broadcaster
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
231 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
232 private static InternetAddress[char[]] groups;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
233 private Buffer mBuffer;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
234 private ProtocolWriter mWriter;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
235 private MulticastConduit mSocket;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
236
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
237 private int groupPort = 3333;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
238 private int groupPrefix = 225;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
239
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
240 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
241
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
242 Setup a Cluster instance. Currently the buffer & writer
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
243 are shared for all bulletin serialization; this should
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
244 probably change at some point such that we can support
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
245 multiple threads broadcasting concurrently to different
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
246 output ports.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
247
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
248 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
249
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
250 this ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
251 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
252 mBuffer = new Buffer (1024 * 4);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
253 mSocket = new MulticastConduit;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
254 mWriter = new ProtocolWriter (mBuffer);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
255 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
256
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
257 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
258
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
259 Setup the multicast options. Port is used as the sole
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
260 address port for multicast usage, prefix is prepended
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
261 to each fabricated multicast address (should be a valid
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
262 class-D prefix), and ttl is the number of hops
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
263
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
264 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
265
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
266 final MulticastConduit conduit ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
267 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
268 return mSocket;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
269 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
270
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
271 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
272
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
273 Setup the multicast options. Port is used as the sole
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
274 address port for multicast usage & prefix is prepended
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
275 to each fabricated multicast address (should be a valid
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
276 class-D prefix: 225 through 239 inclusive)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
277
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
278 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
279
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
280 final void multicast (int port, int prefix=225)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
281 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
282 groupPort = port;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
283 groupPrefix = prefix;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
284 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
285
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
286 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
287
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
288 Broadcast a message on the specified channel. This uses
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
289 IP/Multicast to scatter the payload to all registered
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
290 listeners (on the same multicast group). Note that the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
291 maximum message size is limited to that of an Ethernet
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
292 data frame, minus the IP/UDP header size (1472 bytes).
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
293
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
294 Also note that we are synchronized to avoid contention
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
295 on the otherwise shared output buffer.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
296
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
297 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
298
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
299 final synchronized void broadcast (char[] channel, IMessage message=null)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
300 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
301 // clear buffer and serialize content
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
302 mWriter.put (ProtocolWriter.Command.OK, channel, null, message);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
303
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
304 // Ethernet data-frame size minus the 28 byte UDP/IP header:
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
305 if (mBuffer.position > 1472)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
306 throw new ClusterException ("message is too large to broadcast");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
307
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
308 // send it to the appropriate multicast group
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
309 mSocket.write (mBuffer.slice, getGroup (channel));
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
310 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
311
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
312 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
313
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
314 Return an internet address representing the multicast
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
315 group for the specified channel. We use three of the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
316 four address segments to represent the channel itself
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
317 (via a hash on the channel name), and set the primary
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
318 segment to be that of the broadcast prefix (above).
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
319
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
320 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
321
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
322 final synchronized InternetAddress getGroup (char[] channel)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
323 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
324 auto p = channel in groups;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
325 if (p)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
326 return *p;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
327
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
328 // construct a group address from the prefix & channel-hash,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
329 // where the hash is folded down to 24 bits
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
330 uint hash = jhash (channel.ptr, channel.length);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
331 hash = (hash >> 24) ^ (hash & 0x00ffffff);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
332
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
333 auto address = Integer.toString (groupPrefix) ~ "." ~
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
334 Integer.toString ((hash >> 16) & 0xff) ~ "." ~
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
335 Integer.toString ((hash >> 8) & 0xff) ~ "." ~
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
336 Integer.toString (hash & 0xff);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
337
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
338 // insert InternetAddress into hashmap
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
339 auto group = new InternetAddress (address, groupPort);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
340 groups [channel] = group;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
341 return group;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
342 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
343 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
344
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
345
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
346 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
347
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
348 A channel represents something akin to a publish/subscribe topic,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
349 or a radio station. These are used to segregate cluster operations
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
350 into a set of groups, where each group is represented by a channel.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
351 Channel names are whatever you want then to be: use of dot notation
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
352 has proved useful in the past.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
353
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
354 Channel maintain internal state in order to avoid heap activity. So
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
355 they should not be shared across threads without appropriate synchs
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
356 in place. One remedy is create another channel instance
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
357
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
358 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
359
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
360 private class Channel : IChannel
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
361 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
362 private char[] name_;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
363 private Buffer buffer;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
364 private ProtocolReader reader;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
365 private ProtocolWriter writer;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
366 private Cluster cluster_;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
367
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
368 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
369
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
370 Construct a channel with the specified name. We cache
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
371 a number of session-related constructs here also, in
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
372 order to eliminate runtime overhead
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
373
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
374 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
375
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
376 this (Cluster cluster, char[] name)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
377 in {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
378 assert (cluster);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
379 assert (name.length);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
380 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
381 body
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
382 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
383 name_ = name;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
384 cluster_ = cluster;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
385
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
386 // this buffer will grow as required to house larger messages
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
387 buffer = new GrowBuffer (1024 * 2);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
388 writer = new ProtocolWriter (buffer);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
389
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
390 // make the reader slice directly from the buffer content
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
391 reader = new ProtocolReader (buffer);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
392 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
393
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
394 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
395
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
396 Return the name of this channel. This is the name provided
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
397 when the channel was constructed.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
398
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
399 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
400
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
401 final char[] name ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
402 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
403 return name_;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
404 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
405
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
406 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
407
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
408 Return the assigned cluster
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
409
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
410 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
411
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
412 final Cluster cluster ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
413 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
414 return cluster_;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
415 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
416
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
417 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
418
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
419 Return the assigned logger
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
420
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
421 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
422
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
423 final Logger log ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
424 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
425 return cluster_.log;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
426 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
427
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
428 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
429
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
430 Output this channel via the provided IWriter
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
431
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
432 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
433
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
434 final void write (IWriter writer)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
435 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
436 writer.put (name_);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
437 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
438
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
439 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
440
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
441 Input this channel via the provided IReader
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
442
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
443 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
444
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
445 final void read (IReader reader)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
446 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
447 reader.get (name_);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
448 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
449
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
450 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
451
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
452 deserialize a message into a provided host, or via
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
453 the registered instance of the incoming message
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
454
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
455 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
456
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
457 final IMessage thaw (IMessage host = null)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
458 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
459 return reader.thaw (host);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
460 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
461
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
462 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
463
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
464 Create a listener of the specified type. Listeners are
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
465 run within their own thread, since they spend the vast
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
466 majority of their time blocked on a Socket read. Would
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
467 be good to support multiplexed reading instead, such
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
468 that a thread pool could be applied instead.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
469
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
470 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
471
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
472 final IConsumer createConsumer (ChannelListener notify)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
473 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
474 cluster_.log.trace ("creating message consumer for '" ~ name_ ~ "'");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
475 return new MessageConsumer (this, notify);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
476 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
477
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
478 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
479
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
480 Create a listener of the specified type. Listeners are
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
481 run within their own thread, since they spend the vast
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
482 majority of their time blocked on a Socket read. Would
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
483 be good to support multiplexed reading instead, such
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
484 that a thread pool could be applied instead.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
485
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
486 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
487
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
488 final IConsumer createBulletinConsumer (ChannelListener notify)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
489 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
490 cluster_.log.trace ("creating bulletin consumer for '" ~ name_ ~ "'");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
491 return new BulletinConsumer (this, notify);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
492 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
493
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
494 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
495
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
496 Return a entry from the network cache, and optionally
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
497 remove it. This is a synchronous operation as opposed
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
498 to the asynchronous nature of an invalidate broadcast.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
499
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
500 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
501
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
502 final IMessage getCache (char[] key, bool remove, IMessage host = null)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
503 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
504 void send (IConduit conduit)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
505 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
506 buffer.setConduit (conduit);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
507 writer.put (remove ? ProtocolWriter.Command.Remove :
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
508 ProtocolWriter.Command.Copy, name_, key).flush;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
509 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
510
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
511 if (cluster_.cache.request (&send, reader, key))
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
512 return reader.thaw (host);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
513 return null;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
514 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
515
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
516 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
517
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
518 Place an entry into the network cache, replacing the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
519 entry with the identical key. Where message.time is
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
520 set, it will be used to test for newer cache entries
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
521 than the one being sent i.e. if someone else placed
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
522 a newer entry into the cache, that one will remain.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
523
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
524 Note that this may cause the oldest entry in the cache
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
525 to be displaced if the cache is already full.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
526
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
527 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
528
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
529 final bool putCache (char[] key, IMessage message)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
530 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
531 void send (IConduit conduit)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
532 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
533 buffer.setConduit (conduit);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
534 writer.put (ProtocolWriter.Command.Add, name_, key, message).flush;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
535 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
536
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
537 // return false if the cache server said there's
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
538 // already something newer
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
539 if (cluster_.cache.request (&send, reader, key))
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
540 return false;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
541 return true;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
542 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
543
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
544 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
545
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
546 Load a network cache entry remotely. This sends the given
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
547 IMessage over a network to the cache host, where it will
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
548 be executed locally. The benefit of doing so it that the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
549 host may deny access to the cache entry for the duration
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
550 of the load operation. This, in turn, provides a mechanism
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
551 for gating/synchronizing multiple network clients over a
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
552 given cache entry; quite handy for those entries that are
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
553 relatively expensive to construct or access.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
554
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
555 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
556
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
557 final bool loadCache (char[] key, IMessage message)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
558 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
559 void send (IConduit conduit)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
560 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
561 buffer.setConduit (conduit);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
562 writer.put (ProtocolWriter.Command.Load, name_, key, message).flush;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
563 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
564
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
565 return cluster_.cache.request (&send, reader, key);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
566 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
567
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
568 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
569
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
570 Query the cluster for queued entries on the corresponding
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
571 channel. Returns, and removes, the first matching entry
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
572 from the cluster. Note that this sweeps the cluster for
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
573 matching entries, and is synchronous in nature. The more
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
574 common approach is to setup a queue listener, which will
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
575 grab and dispatch queue entries asynchronously.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
576
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
577 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
578
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
579 final IMessage getQueue (IMessage host = null)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
580 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
581 if (scanQueue)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
582 return reader.thaw (host);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
583 return null;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
584 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
585
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
586 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
587
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
588 Query the cluster for queued entries on the corresponding
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
589 channel. Returns, and removes, the first matching entry
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
590 from the cluster. Note that this sweeps the cluster for
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
591 matching entries, and is synchronous in nature. The more
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
592 common approach is to setup a queue listener, which will
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
593 grab and dispatch queue entries asynchronously.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
594
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
595 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
596
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
597 private bool scanQueue ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
598 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
599 void send (IConduit conduit)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
600 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
601 buffer.setConduit (conduit);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
602 writer.put (ProtocolWriter.Command.RemoveQueue, name_).flush;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
603 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
604
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
605 bool scan (Node node)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
606 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
607 bool message;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
608 node.request (&send, reader, message);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
609 return message;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
610 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
611
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
612 // make a pass over each Node, looking for channel entries
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
613 return cluster_.queue.scan (&scan);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
614 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
615
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
616 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
617
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
618 Add an entry to the specified network queue. May throw a
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
619 QueueFullException if there's no room available.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
620
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
621 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
622
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
623 final IMessage putQueue (IMessage message)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
624 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
625 void send (IConduit conduit)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
626 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
627 buffer.setConduit (conduit);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
628 writer.put (ProtocolWriter.Command.AddQueue, name_, null, message).flush;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
629 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
630
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
631 cluster_.queue.request (&send, reader);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
632 return message;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
633 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
634
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
635 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
636
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
637 Send a remote call request to a server, and place the result
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
638 back into the provided message
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
639
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
640 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
641
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
642 final bool execute (IMessage message)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
643 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
644 void send (IConduit conduit)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
645 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
646 buffer.setConduit (conduit);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
647 writer.put (ProtocolWriter.Command.Call, name_, null, message).flush;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
648 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
649
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
650 if (cluster_.task.request (&send, reader))
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
651 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
652 // place result back into the provided message
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
653 reader.thaw (message);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
654 return true;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
655 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
656 return false;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
657 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
658
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
659 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
660
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
661 Broadcast a message on the specified channel. This uses
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
662 IP/Multicast to scatter the message to all registered
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
663 listeners (on the same multicast group). Note that the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
664 maximum message size is limited to that of an Ethernet
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
665 data frame, minus the IP/UDP header size (1472 bytes).
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
666
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
667 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
668
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
669 final void broadcast (IMessage message = null)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
670 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
671 cluster_.broadcast (name_, message);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
672 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
673 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
674
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
675
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
676 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
677
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
678 A listener for multicast channel traffic. These are currently used
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
679 for cache coherency, queue publishing, and node discovery activity;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
680 though could be used for direct messaging also.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
681
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
682 Be careful when using the retained channel, since it is shared with
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
683 the calling thread. Thus a race condition could arise between the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
684 client and this thread, were both to use the channel for transfers
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
685 at the same instant. Note that MessageConsumer makes a copy of the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
686 channel for this purpose
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
687
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
688 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
689
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
690 private class BulletinConsumer : SocketListener, IConsumer, IEvent
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
691 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
692 private bool hasMore; // incoming message?
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
693 private Buffer buffer; // input buffer
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
694 private ProtocolReader reader; // input decoder
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
695 private Channel channel_; // associated channel
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
696 private Cluster cluster; // associated cluster
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
697 private MulticastConduit consumer; // broadcast listener
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
698 private ChannelListener listener; // user-level callback
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
699
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
700 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
701
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
702 Construct a multicast consumer for the specified event. The
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
703 event handler will be invoked whenever a message arrives for
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
704 the associated channel.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
705
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
706 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
707
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
708 this (Channel channel, ChannelListener listener)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
709 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
710 this.channel_ = channel;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
711 this.listener = listener;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
712 this.cluster = channel.cluster;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
713
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
714 // buffer doesn't need to be larger than Ethernet data-frame
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
715 buffer = new Buffer (1500);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
716
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
717 // make the reader slice directly from the buffer content
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
718 reader = new ProtocolReader (buffer);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
719
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
720 // configure a listener socket
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
721 consumer = new MulticastConduit (cluster.getGroup (channel_.name), true);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
722 consumer.join;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
723
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
724 super (consumer, buffer);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
725
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
726 // fire up this listener
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
727 super.execute;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
728 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
729
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
730 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
731
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
732 Notification callback invoked when we receive a multicast
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
733 packet. Note that we check the packet channel-name against
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
734 the one we're consuming, to check for cases where the group
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
735 address had a hash collision.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
736
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
737 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
738
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
739 override void notify (IBuffer buffer)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
740 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
741 ProtocolWriter.Command cmd;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
742 char[] channel;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
743 char[] element;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
744
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
745 // read the incoming header, along with the object guid
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
746 // where available
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
747 hasMore = reader.getHeader (cmd, channel, element);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
748
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
749 // check it's really for us first (might be a hash collision)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
750 if (channel == this.channel_.name)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
751 invoke (this);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
752 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
753
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
754 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
755
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
756 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
757
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
758 IMessage get (IMessage host = null)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
759 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
760 if (hasMore)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
761 return reader.thaw (host);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
762
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
763 throw new ClusterException ("attempting to thaw a non-existant message");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
764 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
765
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
766 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
767
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
768 Return the assigned logger
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
769
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
770 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
771
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
772 final Logger log ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
773 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
774 return cluster.log;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
775 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
776
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
777 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
778
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
779 Handle error conditions from the listener thread.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
780
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
781 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
782
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
783 override void exception (char [] msg)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
784 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
785 cluster.log.error ("BulletinConsumer: "~msg);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
786 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
787
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
788 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
789
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
790 Overridable mean of notifying the client code.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
791
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
792 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
793
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
794 protected void invoke (IEvent event)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
795 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
796 listener (event);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
797 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
798
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
799 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
800
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
801 Return the cluster instance we're associated with.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
802
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
803 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
804
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
805 final Channel channel ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
806 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
807 return channel_;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
808 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
809
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
810 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
811
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
812 Temporarily halt listening. This can be used to ignore
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
813 multicast messages while, for example, the consumer is
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
814 busy doing other things.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
815
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
816 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
817
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
818 final void pauseGroup ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
819 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
820 consumer.leave;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
821 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
822
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
823 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
824
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
825 Resume listening, post-pause.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
826
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
827 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
828
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
829 final void resumeGroup ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
830 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
831 consumer.join;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
832 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
833
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
834 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
835
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
836 Cancel this consumer. The listener is effectively disabled
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
837 from this point forward. The listener thread does not halt
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
838 at this point, but waits until the socket-read returns.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
839 Note that the D Interface implementation requires us to
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
840 "reimplement and dispatch" trivial things like this ~ it's
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
841 a pain in the neck to maintain.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
842
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
843 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
844
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
845 final void cancel ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
846 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
847 super.cancel;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
848 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
849
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
850 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
851
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
852 Send a message back to the producer
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
853
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
854 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
855
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
856 void reply (IChannel channel, IMessage message)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
857 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
858 assert (channel);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
859 assert (message);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
860
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
861 channel.broadcast (message);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
862 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
863
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
864
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
865 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
866
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
867 Return an appropriate reply channel for the given message,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
868 or return null if no reply is expected
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
869
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
870 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
871
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
872 IChannel replyChannel (IMessage message)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
873 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
874 if (message.reply.length)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
875 return cluster.createChannel (message.reply);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
876 return null;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
877 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
878 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
879
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
880
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
881 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
882
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
883 A listener for queue events. These events are produced by the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
884 queue host on a periodic bases when it has available entries.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
885 We listen for them (rather than constantly scanning) and then
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
886 begin a sweep to process as many as we can. Note that we will
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
887 be in competition with other nodes to process these entries.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
888
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
889 Also note that we create a copy of the channel in use, so that
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
890 race-conditions with the requesting client are avoided.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
891
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
892 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
893
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
894 private class MessageConsumer : BulletinConsumer
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
895 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
896 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
897
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
898 Construct a multicast consumer for the specified event
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
899
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
900 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
901
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
902 this (Channel channel, ChannelListener listener)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
903 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
904 super (channel, listener);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
905
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
906 // create private channel instance to use in our thread
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
907 this.channel_ = new Channel (channel.cluster, channel.name);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
908 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
909
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
910 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
911
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
912 Handle error conditions from the listener thread.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
913
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
914 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
915
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
916 override void exception (char [] msg)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
917 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
918 cluster.log.error ("MessageConsumer: "~msg);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
919 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
920
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
921 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
922
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
923 Overrides the default processing to sweep the cluster for
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
924 queued entries. Each server node is queried until one is
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
925 found that contains a message. Note that it is possible
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
926 to set things up where we are told exactly which node to
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
927 go to; however given that we won't be listening whilst
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
928 scanning, and that there's likely to be a group of new
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
929 entries in the cluster, it's just as effective to scan.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
930 This will be far from ideal for all environments, so we
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
931 should make the strategy pluggable instead.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
932
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
933 Note also that the content is retrieved via a duplicate
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
934 channel to avoid potential race-conditions on the original
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
935
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
936 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
937
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
938 override IMessage get (IMessage host = null)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
939 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
940 if (channel.scanQueue)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
941 return channel.thaw (host);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
942 return null;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
943 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
944
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
945 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
946
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
947 Send a message back to the producer
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
948
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
949 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
950
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
951 override void reply (IChannel channel, IMessage message)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
952 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
953 assert (channel);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
954 assert (message);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
955
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
956 channel.putQueue (message);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
957 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
958
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
959 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
960
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
961 Override the default notification handler in order to
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
962 disable multicast reciepts while the application does
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
963 what it needs to
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
964
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
965 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
966
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
967 override protected void invoke (IEvent event)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
968 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
969 // temporarily pause listening while processing
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
970 pauseGroup;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
971 try {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
972 listener (event);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
973 } finally resumeGroup;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
974 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
975 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
976
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
977
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
978 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
979
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
980 An abstraction of a socket connection. Used internally by the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
981 socket-based Cluster.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
982
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
983 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
984
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
985 private class Connection
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
986 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
987 abstract bool reset();
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
988
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
989 abstract void done (Time time);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
990
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
991 abstract SocketConduit conduit ();
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
992 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
993
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
994
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
995 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
996
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
997 A pool of socket connections for accessing cluster nodes. Note
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
998 that the entries will timeout after a period of inactivity, and
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
999 will subsequently cause a connected host to drop the supporting
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1000 session.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1001
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1002 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1003
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1004 private class ConnectionPool
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1005 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1006 private Logger log;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1007 private int count;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1008 private bool noDelay;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1009 private InternetAddress address;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1010 private PoolConnection freelist;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1011 private TimeSpan timeout = TimeSpan.seconds(60);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1012
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1013 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1014
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1015 Utility class to provide the basic connection facilities
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1016 provided by the connection pool.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1017
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1018 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1019
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1020 static class PoolConnection : Connection
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1021 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1022 Time time;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1023 PoolConnection next;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1024 ConnectionPool parent;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1025 SocketConduit conduit_;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1026
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1027 /***************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1028
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1029 Construct a new connection and set its parent
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1030
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1031 ***************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1032
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1033 this (ConnectionPool pool)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1034 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1035 parent = pool;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1036 reset;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1037 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1038
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1039 /***************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1040
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1041 Create a new socket and connect it to the specified
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1042 server. This will cause a dedicated thread to start
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1043 on the server. Said thread will quit when an error
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1044 occurs.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1045
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1046 ***************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1047
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1048 final bool reset ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1049 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1050 try {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1051 conduit_ = new SocketConduit;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1052
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1053 // apply Nagle settings
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1054 conduit.socket.setNoDelay (parent.noDelay);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1055
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1056 // set a 500ms timeout for read operations
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1057 conduit_.setTimeout (TimeSpan.millis(500));
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1058
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1059 // open a connection to this server
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1060 // parent.log.trace ("connecting to server");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1061 conduit_.connect (parent.address);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1062 return true;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1063
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1064 } catch (Object o)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1065 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1066 if (! Runtime.isHalting)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1067 parent.log.warn ("server is unavailable :: "~o.toString);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1068 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1069 return false;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1070 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1071
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1072 /***************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1073
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1074 Return the socket belonging to this connection
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1075
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1076 ***************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1077
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1078 final SocketConduit conduit ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1079 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1080 return conduit_;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1081 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1082
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1083 /***************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1084
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1085 Close the socket. This will cause any host session
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1086 to be terminated.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1087
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1088 ***************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1089
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1090 final void close ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1091 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1092 conduit_.detach;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1093 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1094
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1095 /***************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1096
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1097 Return this connection to the free-list. Note that
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1098 we have to synchronize on the parent-pool itself.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1099
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1100 ***************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1101
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1102 final void done (Time time)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1103 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1104 synchronized (parent)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1105 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1106 next = parent.freelist;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1107 parent.freelist = this;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1108 this.time = time;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1109 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1110 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1111 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1112
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1113
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1114 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1115
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1116 Create a connection-pool for the specified address.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1117
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1118 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1119
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1120 this (InternetAddress address, Logger log, bool noDelay)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1121 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1122 this.log = log;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1123 this.address = address;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1124 this.noDelay = noDelay;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1125 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1126
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1127 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1128
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1129 Allocate a Connection from a list rather than creating a
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1130 new one. Reap old entries as we go.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1131
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1132 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1133
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1134 final synchronized Connection borrow (Time time)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1135 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1136 if (freelist)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1137 do {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1138 auto c = freelist;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1139
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1140 freelist = c.next;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1141 if (freelist && (time - c.time > timeout))
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1142 c.close;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1143 else
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1144 return c;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1145 } while (true);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1146
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1147 return new PoolConnection (this);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1148 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1149
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1150 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1151
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1152 Close this pool and drop all existing connections.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1153
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1154 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1155
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1156 final synchronized void close ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1157 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1158 auto c = freelist;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1159 freelist = null;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1160 while (c)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1161 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1162 c.close;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1163 c = c.next;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1164 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1165 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1166 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1167
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1168
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1169 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1170
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1171 Class to represent a cluster node. Each node supports both cache
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1172 and queue functionality. Note that the set of available nodes is
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1173 configured at startup, simplifying the discovery process in some
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1174 significant ways, and causing less thrashing of cache-keys.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1175
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1176 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1177
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1178 private class Node
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1179 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1180 private Logger log;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1181 private char[] name,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1182 addr;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1183 private ConnectionPool pool;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1184 private bool enabled;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1185
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1186 alias void delegate (IConduit conduit) Requestor;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1187
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1188 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1189
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1190 Construct a node with the provided name. This name should
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1191 be the network name of the hosting device.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1192
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1193 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1194
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1195 this (Logger log, char[] addr, char[] name)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1196 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1197 this.log = log;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1198 this.addr = addr;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1199 this.name = name ~ ':' ~ addr;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1200 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1201
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1202 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1203
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1204 Add a cache/queue reference for the remote node
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1205
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1206 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1207
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1208 final void setPool (InternetAddress address, bool noDelay)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1209 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1210 this.pool = new ConnectionPool (address, log, noDelay);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1211 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1212
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1213 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1214
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1215 Return the name of this node
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1216
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1217 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1218
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1219 override char[] toString ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1220 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1221 return name;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1222 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1223
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1224 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1225
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1226 Return the network address of this node
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1227
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1228 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1229
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1230 final char[] address ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1231 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1232 return addr;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1233 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1234
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1235 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1236
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1237 Remove this Node from the cluster. The node is disabled
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1238 until it is seen to recover.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1239
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1240 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1241
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1242 final void fail ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1243 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1244 setEnabled (false);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1245 pool.close;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1246 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1247
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1248 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1249
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1250 Get the current state of this node
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1251
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1252 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1253
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1254 final bool isEnabled ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1255 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1256 volatile
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1257 return enabled;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1258 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1259
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1260 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1261
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1262 Set the enabled state of this node
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1263
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1264 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1265
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1266 final void setEnabled (bool enabled)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1267 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1268 if (enabled)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1269 log.trace ("enabling "~name);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1270 else
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1271 log.trace ("disabling "~name);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1272
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1273 volatile
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1274 this.enabled = enabled;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1275 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1276
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1277 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1278
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1279 request data; fail this Node if we can't connect. Note
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1280 that we make several attempts to connect before writing
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1281 the node off as a failure. We use a delegate to perform
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1282 the request output since it may be invoked on more than
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1283 one iteration, where the current attempt fails.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1284
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1285 We return true if the cluster node responds, and false
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1286 otherwise. Exceptions are thrown if they occured on the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1287 server. Parameter 'message' is set true if a message is
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1288 available from the server response
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1289
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1290 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1291
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1292 final bool request (Requestor dg, ProtocolReader reader, out bool message)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1293 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1294 ProtocolWriter.Command cmd;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1295 Time time;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1296 char[] channel;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1297 char[] element;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1298
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1299 // it's possible that the pool may have failed between
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1300 // the point of selecting it, and the invocation itself
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1301 if (pool is null)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1302 return false;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1303
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1304 // get a connection to the server
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1305 auto connect = pool.borrow (time = Clock.now);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1306
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1307 // talk to the server (try a few times if necessary)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1308 for (int attempts=3; attempts--;)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1309 try {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1310 // attach connection to writer and send request
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1311 dg (connect.conduit);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1312
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1313 // attach connection to reader
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1314 reader.buffer.setConduit (connect.conduit);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1315
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1316 // load the returned object. Don't retry on
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1317 // failed reads, since the server is either
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1318 // really really busy, or near death. We must
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1319 // assume it is offline until it tells us
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1320 // otherwise (via a heartbeat)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1321 attempts = 0;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1322 message = reader.getHeader (cmd, channel, element);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1323
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1324 // return borrowed connection
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1325 connect.done (time);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1326
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1327 } catch (RegistryException x)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1328 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1329 connect.done (time);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1330 throw x;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1331 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1332 catch (IOException x)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1333 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1334 log.trace ("IOException on server request :: "~x.toString);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1335
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1336 // attempt to reconnect?
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1337 if (attempts is 0 || !connect.reset)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1338 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1339 // that server is offline
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1340 fail;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1341
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1342 // state that we failed
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1343 return false;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1344 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1345 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1346
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1347 // is message an exception?
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1348 if (cmd !is ProtocolWriter.Command.OK)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1349 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1350 // is node full?
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1351 if (cmd is ProtocolWriter.Command.Full)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1352 throw new ClusterFullException (channel);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1353
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1354 // did node barf?
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1355 if (cmd is ProtocolWriter.Command.Exception)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1356 throw new ClusterException (channel);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1357
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1358 // bogus response
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1359 throw new ClusterException ("invalid response from cluster server");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1360 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1361
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1362 // ok, our server responded
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1363 return true;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1364 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1365 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1366
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1367
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1368 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1369
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1370 Models a generic set of cluster nodes. This is intended to be
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1371 thread-safe, with no locking on a lookup operation
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1372
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1373 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1374
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1375 private class NodeSet
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1376 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1377 private Node[char[]] map;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1378 private Logger log;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1379 private Set set;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1380 private bool noDelay;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1381
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1382 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1383
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1384 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1385
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1386 this (Logger log, bool noDelay)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1387 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1388 this.log = log;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1389 this.set = new Set;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1390 this.noDelay = noDelay;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1391 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1392
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1393 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1394
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1395 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1396
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1397 final Logger logger ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1398 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1399 return log;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1400 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1401
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1402 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1403
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1404 Add a node to the list of servers
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1405
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1406 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1407
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1408 synchronized final Node addNode (Node node)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1409 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1410 auto addr = node.address;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1411 if (addr in map)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1412 throw new ClusterException ("Attempt to add cluster node '"~addr~"' more than once");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1413
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1414 map[addr] = node;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1415
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1416 // note that this creates a new Set instance. We do this
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1417 // so that selectNode() can avoid synchronization
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1418 set = set.add (node);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1419 return node;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1420 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1421
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1422 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1423
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1424 Select a cluster server based on a starting index. If the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1425 selected server is not currently enabled, we just try the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1426 next one. This behaviour should be consistent across each
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1427 cluster client.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1428
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1429 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1430
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1431 final Node selectNode (uint index)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1432 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1433 auto hosts = set.nodes;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1434 uint count = hosts.length;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1435
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1436 if (count)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1437 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1438 index %= count;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1439
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1440 while (count--)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1441 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1442 auto node = hosts [index];
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1443 if (node.isEnabled)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1444 return node;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1445
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1446 if (++index >= hosts.length)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1447 index = 0;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1448 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1449 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1450 throw new ClusterEmptyException ("No appropriate cluster nodes are available");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1451 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1452
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1453 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1454
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1455 Host class for the set of nodes. We utilize this to enable
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1456 atomic read/write where it would not be otherwise possible
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1457 -- D arrays are organized as ptr+length pairs and are thus
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1458 inherently non-atomic for assignment purposes
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1459
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1460 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1461
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1462 private static class Set
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1463 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1464 Node[] nodes,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1465 random;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1466
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1467 final Set add (Node node)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1468 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1469 auto s = new Set;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1470 s.nodes = nodes ~ node;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1471 s.randomize;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1472 return s;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1473 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1474
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1475 private final void randomize ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1476 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1477 // copy the node list
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1478 random = nodes.dup;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1479
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1480 // muddle up the duplicate list. This randomized list
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1481 // is used when scanning the cluster for queued entries
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1482 foreach (i, n; random)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1483 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1484 auto j = Random.shared.next (random.length);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1485 auto tmp = random[i];
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1486 random[i] = random[j];
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1487 random[j] = tmp;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1488 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1489 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1490 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1491 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1492
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1493
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1494 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1495
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1496 Models a fixed set of cluster nodes. Used for Cache
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1497
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1498 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1499
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1500 private class FixedNodeSet : NodeSet
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1501 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1502 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1503
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1504 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1505
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1506 this (Logger log, bool noDelay)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1507 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1508 super (log, noDelay);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1509 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1510
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1511 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1512
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1513 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1514
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1515 final synchronized void enable (char[] addr)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1516 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1517 auto p = addr in map;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1518 if (p)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1519 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1520 auto node = *p;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1521 if (! node.isEnabled)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1522 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1523 node.setPool (new InternetAddress(addr), noDelay);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1524 node.setEnabled (true);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1525 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1526 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1527 else
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1528 // don't throw when no cache hosts have been configured at all
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1529 if (set.nodes.length)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1530 throw new ClusterException ("Attempt to enable unregistered cache node '"~addr~"'");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1531 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1532
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1533 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1534
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1535 Select a cluster server based on the specified key. If the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1536 selected server is not currently enabled, we just try the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1537 next one. This behaviour should be consistent across each
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1538 cluster client.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1539
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1540 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1541
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1542 final bool request (Node.Requestor dg, ProtocolReader reader, char[] key)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1543 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1544 Node node;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1545 bool message;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1546
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1547 do {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1548 node = selectNode (jhash (key.ptr, key.length));
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1549 } while (! node.request (dg, reader, message));
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1550
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1551 return message;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1552 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1553 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1554
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1555
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1556 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1557
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1558 Models a flexible set of cluster nodes. Used for queue and task
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1559
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1560 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1561
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1562 private class FlexNodeSet : NodeSet
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1563 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1564 private uint rollover;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1565
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1566 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1567
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1568 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1569
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1570 this (Logger log, bool noDelay)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1571 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1572 super (log, noDelay);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1573 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1574
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1575 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1576
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1577 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1578
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1579 final synchronized void enable (char[] addr, char[] name)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1580 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1581 auto p = addr in map;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1582 auto node = p ? *p : addNode (new Node (log, addr, name));
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1583
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1584 if (! node.isEnabled)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1585 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1586 node.setPool (new InternetAddress(addr), noDelay);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1587 node.setEnabled (true);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1588 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1589 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1590
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1591 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1592
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1593 Select a cluster server based on the specified key. If the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1594 selected server is not currently enabled, we just try the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1595 next one. This behaviour should be consistent across each
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1596 cluster client.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1597
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1598 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1599
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1600 final bool request (Node.Requestor dg, ProtocolReader reader)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1601 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1602 Node node;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1603 bool message;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1604
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1605 do {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1606 node = selectNode (++rollover);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1607 } while (! node.request (dg, reader, message));
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1608
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1609 return message;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1610 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1611
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1612 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1613
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1614 Sweep the cluster servers. Returns true if the delegate
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1615 returns true, false otherwise. The sweep is halted when
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1616 the delegate returns true. Note that this scans nodes in
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1617 a randomized pattern, which should tend to avoid 'bursty'
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1618 activity by a set of clients upon any one cluster server.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1619
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1620 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1621
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1622 final bool scan (bool delegate(Node) dg)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1623 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1624 auto hosts = set.random;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1625 auto index = hosts.length;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1626
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1627 while (index)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1628 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1629 // lookup the randomized set of server nodes
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1630 auto node = hosts [--index];
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1631
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1632 // callback on each enabled node
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1633 if (node.isEnabled)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1634 if (dg (node))
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1635 return true;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1636 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1637 return false;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1638 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1639 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1640
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1641
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1642 /******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1643
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1644 The Bob Jenkins lookup2 algorithm. This should be relocated
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1645 to somewhere common
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1646
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1647 ******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1648
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1649 private static uint jhash (void* k, uint len, uint init = 0)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1650 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1651 uint a = 0x9e3779b9,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1652 b = 0x9e3779b9,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1653 c = init,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1654 i = len;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1655
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1656 // handle most of the key
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1657 while (i >= 12)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1658 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1659 a += *cast(uint*)(k+0);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1660 b += *cast(uint*)(k+4);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1661 c += *cast(uint*)(k+8);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1662
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1663 a -= b; a -= c; a ^= (c>>13);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1664 b -= c; b -= a; b ^= (a<<8);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1665 c -= a; c -= b; c ^= (b>>13);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1666 a -= b; a -= c; a ^= (c>>12);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1667 b -= c; b -= a; b ^= (a<<16);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1668 c -= a; c -= b; c ^= (b>>5);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1669 a -= b; a -= c; a ^= (c>>3);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1670 b -= c; b -= a; b ^= (a<<10);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1671 c -= a; c -= b; c ^= (b>>15);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1672 k += 12; i -= 12;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1673 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1674
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1675 // handle the last 11 bytes
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1676 c += len;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1677 switch (i)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1678 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1679 case 11: c+=(cast(uint)(cast(ubyte*)k)[10]<<24);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1680 case 10: c+=(cast(uint)(cast(ubyte*)k)[9]<<16);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1681 case 9 : c+=(cast(uint)(cast(ubyte*)k)[8]<<8);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1682 case 8 : b+=(cast(uint)(cast(ubyte*)k)[7]<<24);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1683 case 7 : b+=(cast(uint)(cast(ubyte*)k)[6]<<16);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1684 case 6 : b+=(cast(uint)(cast(ubyte*)k)[5]<<8);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1685 case 5 : b+=(cast(uint)(cast(ubyte*)k)[4]);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1686 case 4 : a+=(cast(uint)(cast(ubyte*)k)[3]<<24);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1687 case 3 : a+=(cast(uint)(cast(ubyte*)k)[2]<<16);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1688 case 2 : a+=(cast(uint)(cast(ubyte*)k)[1]<<8);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1689 case 1 : a+=(cast(uint)(cast(ubyte*)k)[0]);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1690 default:
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1691 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1692
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1693 a -= b; a -= c; a ^= (c>>13);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1694 b -= c; b -= a; b ^= (a<<8);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1695 c -= a; c -= b; c ^= (b>>13);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1696 a -= b; a -= c; a ^= (c>>12);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1697 b -= c; b -= a; b ^= (a<<16);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1698 c -= a; c -= b; c ^= (b>>5);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1699 a -= b; a -= c; a ^= (c>>3);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1700 b -= c; b -= a; b ^= (a<<10);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1701 c -= a; c -= b; c ^= (b>>15);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1702
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1703 return c;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1704 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1705