annotate tango/tango/net/cluster/tina/CacheThread.d @ 132:1700239cab2e trunk

[svn r136] MAJOR UNSTABLE UPDATE!!! Initial commit after moving to Tango instead of Phobos. Lots of bugfixes... This build is not suitable for most things.
author lindquist
date Fri, 11 Jan 2008 17:57:40 +0100
parents
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
132
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
2
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
4
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
5 license: BSD style: $(LICENSE)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
6
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
7 version: July 2004: Initial release
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
8
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
9 author: Kris
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
10
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
11 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
12
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
13 module tango.net.cluster.tina.CacheThread;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
14
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
15 private import tango.core.Exception;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
16
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
17 private import tango.net.cluster.NetworkRegistry;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
18
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
19 private import tango.net.cluster.tina.ClusterCache,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
20 tango.net.cluster.tina.ClusterTypes,
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
21 tango.net.cluster.tina.ClusterThread;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
22
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
23 /******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
24
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
25 Thread for handling cache requests
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
26
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
27 ******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
28
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
29 class CacheThread : ClusterThread
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
30 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
31 private ClusterCache cache;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
32 private NetworkRegistry registry;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
33
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
34 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
35
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
36 Note that the conduit stays open until the client kills it
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
37
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
38 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
39
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
40 this (AbstractServer server, IConduit conduit, Cluster cluster, ClusterCache cache)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
41 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
42 super (server, conduit, cluster);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
43
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
44 // clone the registry so that we have our own set of
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
45 // message templates to act as hosts. This eliminates
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
46 // allocating hosts on the fly for load() requests
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
47 registry = NetworkRegistry.shared.dup;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
48
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
49 // retain the cache instance
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
50 this.cache = cache;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
51 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
52
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
53 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
54
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
55 process client requests
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
56
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
57 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
58
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
59 void dispatch ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
60 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
61 ProtocolWriter.Command cmd;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
62 long time;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
63 char[] channel;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
64 char[] element;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
65
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
66 // wait for request to arrive
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
67 auto content = reader.getPacket (cmd, channel, element, time);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
68
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
69 switch (cmd)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
70 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
71 case ProtocolWriter.Command.Add:
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
72 logger.trace (sprint ("{} add cache entry '{}' on channel '{}'", client, element, channel));
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
73
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
74 // return the content if we can't put it in the cache
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
75 if (cache.put (channel, element, content, Time(time)))
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
76 writer.success ("success");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
77 else
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
78 writer.reply (content);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
79 break;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
80
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
81 case ProtocolWriter.Command.Copy:
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
82 logger.trace (sprint ("{} copy cache entry '{}' on channel '{}'", client, element, channel));
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
83
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
84 writer.reply (cache.get (channel, element));
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
85 break;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
86
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
87 case ProtocolWriter.Command.Remove:
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
88 logger.trace (sprint ("{} remove cache entry '{}' on channel '{}'", client, element, channel));
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
89
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
90 writer.reply (cache.extract (channel, element));
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
91 break;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
92
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
93 case ProtocolWriter.Command.Load:
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
94 logger.trace (sprint ("{} loading cache entry '{}' on channel '{}'", client, element, channel));
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
95
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
96 load (cmd, channel, element);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
97 break;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
98
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
99 default:
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
100 throw new IllegalArgumentException ("invalid command");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
101 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
102 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
103
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
104
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
105 /**********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
106
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
107 Manages the loading of cache entries remotely, upon
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
108 the host that actually contains the cache entry.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
109
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
110 The benefit of this approach lies in the ability to
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
111 'gate' access to specific resources across the entire
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
112 network. That is; where particular cache entries are
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
113 prohibitively costly to construct, it is worthwhile
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
114 ensuring that cost is reduced to a bare minimum. These
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
115 remote loaders allow the cache host to block multiple
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
116 network clients until there's a new entry available.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
117 Without this mechanism, it would become possible for
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
118 multiple network clients to request the same entry
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
119 simultaneously, therefore increasing the overall cost.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
120 The end result is similar to that of a distributed
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
121 transaction.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
122
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
123 **********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
124
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
125 void load (ProtocolWriter.Command cmd, char[] channel, char[] element)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
126 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
127 // convert to a message instance. Note that we use a private
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
128 // set of msg templates, so we don't collide with other threads
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
129 auto msg = reader.thaw (registry);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
130
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
131 // check to see if it has already been updated or is
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
132 // currently locked; go home if so, otherwise lock it
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
133 if (cache.lock (channel, element, msg.time))
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
134 try {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
135 // ensure this is the right object
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
136 auto loader = cast(IMessageLoader) msg;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
137 if (loader)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
138 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
139 // acknowledge the request. Do NOT wait for completion!
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
140 writer.success.flush;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
141
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
142 // get the new cache entry. The 'time' attribute should
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
143 // be set appropriately before return
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
144 if (auto e = loader.load)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
145 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
146 long time;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
147 // serialize new entry and stuff it into cache
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
148 writer.put (writer.Command.OK, channel, element, e);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
149 cache.put (channel, element, reader.getPacket (cmd, channel, element, time), e.time);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
150 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
151 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
152 else
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
153 writer.exception (sprint ("invalid remote cache-loader '{}'", msg.toString)).flush;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
154
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
155 } finally
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
156 // ensure we unlock this one!
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
157 cache.unlock (channel, element);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
158 else
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
159 writer.success.flush;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
160 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
161 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
162