Mercurial > projects > ldc
comparison tango/tango/net/cluster/tina/CacheThread.d @ 132:1700239cab2e trunk
[svn r136] MAJOR UNSTABLE UPDATE!!!
Initial commit after moving to Tango instead of Phobos.
Lots of bugfixes...
This build is not suitable for most things.
author | lindquist |
---|---|
date | Fri, 11 Jan 2008 17:57:40 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
131:5825d48b27d1 | 132:1700239cab2e |
---|---|
1 /******************************************************************************* | |
2 | |
3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved | |
4 | |
5 license: BSD style: $(LICENSE) | |
6 | |
7 version: July 2004: Initial release | |
8 | |
9 author: Kris | |
10 | |
11 *******************************************************************************/ | |
12 | |
13 module tango.net.cluster.tina.CacheThread; | |
14 | |
15 private import tango.core.Exception; | |
16 | |
17 private import tango.net.cluster.NetworkRegistry; | |
18 | |
19 private import tango.net.cluster.tina.ClusterCache, | |
20 tango.net.cluster.tina.ClusterTypes, | |
21 tango.net.cluster.tina.ClusterThread; | |
22 | |
23 /****************************************************************************** | |
24 | |
25 Thread for handling cache requests | |
26 | |
27 ******************************************************************************/ | |
28 | |
29 class CacheThread : ClusterThread | |
30 { | |
31 private ClusterCache cache; | |
32 private NetworkRegistry registry; | |
33 | |
34 /********************************************************************** | |
35 | |
36 Note that the conduit stays open until the client kills it | |
37 | |
38 **********************************************************************/ | |
39 | |
40 this (AbstractServer server, IConduit conduit, Cluster cluster, ClusterCache cache) | |
41 { | |
42 super (server, conduit, cluster); | |
43 | |
44 // clone the registry so that we have our own set of | |
45 // message templates to act as hosts. This eliminates | |
46 // allocating hosts on the fly for load() requests | |
47 registry = NetworkRegistry.shared.dup; | |
48 | |
49 // retain the cache instance | |
50 this.cache = cache; | |
51 } | |
52 | |
53 /********************************************************************** | |
54 | |
55 process client requests | |
56 | |
57 **********************************************************************/ | |
58 | |
59 void dispatch () | |
60 { | |
61 ProtocolWriter.Command cmd; | |
62 long time; | |
63 char[] channel; | |
64 char[] element; | |
65 | |
66 // wait for request to arrive | |
67 auto content = reader.getPacket (cmd, channel, element, time); | |
68 | |
69 switch (cmd) | |
70 { | |
71 case ProtocolWriter.Command.Add: | |
72 logger.trace (sprint ("{} add cache entry '{}' on channel '{}'", client, element, channel)); | |
73 | |
74 // return the content if we can't put it in the cache | |
75 if (cache.put (channel, element, content, Time(time))) | |
76 writer.success ("success"); | |
77 else | |
78 writer.reply (content); | |
79 break; | |
80 | |
81 case ProtocolWriter.Command.Copy: | |
82 logger.trace (sprint ("{} copy cache entry '{}' on channel '{}'", client, element, channel)); | |
83 | |
84 writer.reply (cache.get (channel, element)); | |
85 break; | |
86 | |
87 case ProtocolWriter.Command.Remove: | |
88 logger.trace (sprint ("{} remove cache entry '{}' on channel '{}'", client, element, channel)); | |
89 | |
90 writer.reply (cache.extract (channel, element)); | |
91 break; | |
92 | |
93 case ProtocolWriter.Command.Load: | |
94 logger.trace (sprint ("{} loading cache entry '{}' on channel '{}'", client, element, channel)); | |
95 | |
96 load (cmd, channel, element); | |
97 break; | |
98 | |
99 default: | |
100 throw new IllegalArgumentException ("invalid command"); | |
101 } | |
102 } | |
103 | |
104 | |
105 /********************************************************************** | |
106 | |
107 Manages the loading of cache entries remotely, upon | |
108 the host that actually contains the cache entry. | |
109 | |
110 The benefit of this approach lies in the ability to | |
111 'gate' access to specific resources across the entire | |
112 network. That is; where particular cache entries are | |
113 prohibitively costly to construct, it is worthwhile | |
114 ensuring that cost is reduced to a bare minimum. These | |
115 remote loaders allow the cache host to block multiple | |
116 network clients until there's a new entry available. | |
117 Without this mechanism, it would become possible for | |
118 multiple network clients to request the same entry | |
119 simultaneously, therefore increasing the overall cost. | |
120 The end result is similar to that of a distributed | |
121 transaction. | |
122 | |
123 **********************************************************************/ | |
124 | |
125 void load (ProtocolWriter.Command cmd, char[] channel, char[] element) | |
126 { | |
127 // convert to a message instance. Note that we use a private | |
128 // set of msg templates, so we don't collide with other threads | |
129 auto msg = reader.thaw (registry); | |
130 | |
131 // check to see if it has already been updated or is | |
132 // currently locked; go home if so, otherwise lock it | |
133 if (cache.lock (channel, element, msg.time)) | |
134 try { | |
135 // ensure this is the right object | |
136 auto loader = cast(IMessageLoader) msg; | |
137 if (loader) | |
138 { | |
139 // acknowledge the request. Do NOT wait for completion! | |
140 writer.success.flush; | |
141 | |
142 // get the new cache entry. The 'time' attribute should | |
143 // be set appropriately before return | |
144 if (auto e = loader.load) | |
145 { | |
146 long time; | |
147 // serialize new entry and stuff it into cache | |
148 writer.put (writer.Command.OK, channel, element, e); | |
149 cache.put (channel, element, reader.getPacket (cmd, channel, element, time), e.time); | |
150 } | |
151 } | |
152 else | |
153 writer.exception (sprint ("invalid remote cache-loader '{}'", msg.toString)).flush; | |
154 | |
155 } finally | |
156 // ensure we unlock this one! | |
157 cache.unlock (channel, element); | |
158 else | |
159 writer.success.flush; | |
160 } | |
161 } | |
162 |