comparison tango/tango/net/cluster/tina/CacheThread.d @ 132:1700239cab2e trunk

[svn r136] MAJOR UNSTABLE UPDATE!!! Initial commit after moving to Tango instead of Phobos. Lots of bugfixes... This build is not suitable for most things.
author lindquist
date Fri, 11 Jan 2008 17:57:40 +0100
parents
children
comparison
equal deleted inserted replaced
131:5825d48b27d1 132:1700239cab2e
1 /*******************************************************************************
2
3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved
4
5 license: BSD style: $(LICENSE)
6
7 version: July 2004: Initial release
8
9 author: Kris
10
11 *******************************************************************************/
12
13 module tango.net.cluster.tina.CacheThread;
14
15 private import tango.core.Exception;
16
17 private import tango.net.cluster.NetworkRegistry;
18
19 private import tango.net.cluster.tina.ClusterCache,
20 tango.net.cluster.tina.ClusterTypes,
21 tango.net.cluster.tina.ClusterThread;
22
23 /******************************************************************************
24
25 Thread for handling cache requests
26
27 ******************************************************************************/
28
29 class CacheThread : ClusterThread
30 {
31 private ClusterCache cache;
32 private NetworkRegistry registry;
33
34 /**********************************************************************
35
36 Note that the conduit stays open until the client kills it
37
38 **********************************************************************/
39
40 this (AbstractServer server, IConduit conduit, Cluster cluster, ClusterCache cache)
41 {
42 super (server, conduit, cluster);
43
44 // clone the registry so that we have our own set of
45 // message templates to act as hosts. This eliminates
46 // allocating hosts on the fly for load() requests
47 registry = NetworkRegistry.shared.dup;
48
49 // retain the cache instance
50 this.cache = cache;
51 }
52
53 /**********************************************************************
54
55 process client requests
56
57 **********************************************************************/
58
59 void dispatch ()
60 {
61 ProtocolWriter.Command cmd;
62 long time;
63 char[] channel;
64 char[] element;
65
66 // wait for request to arrive
67 auto content = reader.getPacket (cmd, channel, element, time);
68
69 switch (cmd)
70 {
71 case ProtocolWriter.Command.Add:
72 logger.trace (sprint ("{} add cache entry '{}' on channel '{}'", client, element, channel));
73
74 // return the content if we can't put it in the cache
75 if (cache.put (channel, element, content, Time(time)))
76 writer.success ("success");
77 else
78 writer.reply (content);
79 break;
80
81 case ProtocolWriter.Command.Copy:
82 logger.trace (sprint ("{} copy cache entry '{}' on channel '{}'", client, element, channel));
83
84 writer.reply (cache.get (channel, element));
85 break;
86
87 case ProtocolWriter.Command.Remove:
88 logger.trace (sprint ("{} remove cache entry '{}' on channel '{}'", client, element, channel));
89
90 writer.reply (cache.extract (channel, element));
91 break;
92
93 case ProtocolWriter.Command.Load:
94 logger.trace (sprint ("{} loading cache entry '{}' on channel '{}'", client, element, channel));
95
96 load (cmd, channel, element);
97 break;
98
99 default:
100 throw new IllegalArgumentException ("invalid command");
101 }
102 }
103
104
105 /**********************************************************************
106
107 Manages the loading of cache entries remotely, upon
108 the host that actually contains the cache entry.
109
110 The benefit of this approach lies in the ability to
111 'gate' access to specific resources across the entire
112 network. That is; where particular cache entries are
113 prohibitively costly to construct, it is worthwhile
114 ensuring that cost is reduced to a bare minimum. These
115 remote loaders allow the cache host to block multiple
116 network clients until there's a new entry available.
117 Without this mechanism, it would become possible for
118 multiple network clients to request the same entry
119 simultaneously, therefore increasing the overall cost.
120 The end result is similar to that of a distributed
121 transaction.
122
123 **********************************************************************/
124
125 void load (ProtocolWriter.Command cmd, char[] channel, char[] element)
126 {
127 // convert to a message instance. Note that we use a private
128 // set of msg templates, so we don't collide with other threads
129 auto msg = reader.thaw (registry);
130
131 // check to see if it has already been updated or is
132 // currently locked; go home if so, otherwise lock it
133 if (cache.lock (channel, element, msg.time))
134 try {
135 // ensure this is the right object
136 auto loader = cast(IMessageLoader) msg;
137 if (loader)
138 {
139 // acknowledge the request. Do NOT wait for completion!
140 writer.success.flush;
141
142 // get the new cache entry. The 'time' attribute should
143 // be set appropriately before return
144 if (auto e = loader.load)
145 {
146 long time;
147 // serialize new entry and stuff it into cache
148 writer.put (writer.Command.OK, channel, element, e);
149 cache.put (channel, element, reader.getPacket (cmd, channel, element, time), e.time);
150 }
151 }
152 else
153 writer.exception (sprint ("invalid remote cache-loader '{}'", msg.toString)).flush;
154
155 } finally
156 // ensure we unlock this one!
157 cache.unlock (channel, element);
158 else
159 writer.success.flush;
160 }
161 }
162