132
|
1 /*******************************************************************************
|
|
2
|
|
3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved
|
|
4
|
|
5 license: BSD style: $(LICENSE)
|
|
6
|
|
7 version: July 2004: Initial release
|
|
8
|
|
9 author: Kris
|
|
10
|
|
11 *******************************************************************************/
|
|
12
|
|
13 module tango.net.cluster.tina.CacheThread;
|
|
14
|
|
15 private import tango.core.Exception;
|
|
16
|
|
17 private import tango.net.cluster.NetworkRegistry;
|
|
18
|
|
19 private import tango.net.cluster.tina.ClusterCache,
|
|
20 tango.net.cluster.tina.ClusterTypes,
|
|
21 tango.net.cluster.tina.ClusterThread;
|
|
22
|
|
23 /******************************************************************************
|
|
24
|
|
25 Thread for handling cache requests
|
|
26
|
|
27 ******************************************************************************/
|
|
28
|
|
29 class CacheThread : ClusterThread
|
|
30 {
|
|
31 private ClusterCache cache;
|
|
32 private NetworkRegistry registry;
|
|
33
|
|
34 /**********************************************************************
|
|
35
|
|
36 Note that the conduit stays open until the client kills it
|
|
37
|
|
38 **********************************************************************/
|
|
39
|
|
40 this (AbstractServer server, IConduit conduit, Cluster cluster, ClusterCache cache)
|
|
41 {
|
|
42 super (server, conduit, cluster);
|
|
43
|
|
44 // clone the registry so that we have our own set of
|
|
45 // message templates to act as hosts. This eliminates
|
|
46 // allocating hosts on the fly for load() requests
|
|
47 registry = NetworkRegistry.shared.dup;
|
|
48
|
|
49 // retain the cache instance
|
|
50 this.cache = cache;
|
|
51 }
|
|
52
|
|
53 /**********************************************************************
|
|
54
|
|
55 process client requests
|
|
56
|
|
57 **********************************************************************/
|
|
58
|
|
59 void dispatch ()
|
|
60 {
|
|
61 ProtocolWriter.Command cmd;
|
|
62 long time;
|
|
63 char[] channel;
|
|
64 char[] element;
|
|
65
|
|
66 // wait for request to arrive
|
|
67 auto content = reader.getPacket (cmd, channel, element, time);
|
|
68
|
|
69 switch (cmd)
|
|
70 {
|
|
71 case ProtocolWriter.Command.Add:
|
|
72 logger.trace (sprint ("{} add cache entry '{}' on channel '{}'", client, element, channel));
|
|
73
|
|
74 // return the content if we can't put it in the cache
|
|
75 if (cache.put (channel, element, content, Time(time)))
|
|
76 writer.success ("success");
|
|
77 else
|
|
78 writer.reply (content);
|
|
79 break;
|
|
80
|
|
81 case ProtocolWriter.Command.Copy:
|
|
82 logger.trace (sprint ("{} copy cache entry '{}' on channel '{}'", client, element, channel));
|
|
83
|
|
84 writer.reply (cache.get (channel, element));
|
|
85 break;
|
|
86
|
|
87 case ProtocolWriter.Command.Remove:
|
|
88 logger.trace (sprint ("{} remove cache entry '{}' on channel '{}'", client, element, channel));
|
|
89
|
|
90 writer.reply (cache.extract (channel, element));
|
|
91 break;
|
|
92
|
|
93 case ProtocolWriter.Command.Load:
|
|
94 logger.trace (sprint ("{} loading cache entry '{}' on channel '{}'", client, element, channel));
|
|
95
|
|
96 load (cmd, channel, element);
|
|
97 break;
|
|
98
|
|
99 default:
|
|
100 throw new IllegalArgumentException ("invalid command");
|
|
101 }
|
|
102 }
|
|
103
|
|
104
|
|
105 /**********************************************************************
|
|
106
|
|
107 Manages the loading of cache entries remotely, upon
|
|
108 the host that actually contains the cache entry.
|
|
109
|
|
110 The benefit of this approach lies in the ability to
|
|
111 'gate' access to specific resources across the entire
|
|
112 network. That is; where particular cache entries are
|
|
113 prohibitively costly to construct, it is worthwhile
|
|
114 ensuring that cost is reduced to a bare minimum. These
|
|
115 remote loaders allow the cache host to block multiple
|
|
116 network clients until there's a new entry available.
|
|
117 Without this mechanism, it would become possible for
|
|
118 multiple network clients to request the same entry
|
|
119 simultaneously, therefore increasing the overall cost.
|
|
120 The end result is similar to that of a distributed
|
|
121 transaction.
|
|
122
|
|
123 **********************************************************************/
|
|
124
|
|
125 void load (ProtocolWriter.Command cmd, char[] channel, char[] element)
|
|
126 {
|
|
127 // convert to a message instance. Note that we use a private
|
|
128 // set of msg templates, so we don't collide with other threads
|
|
129 auto msg = reader.thaw (registry);
|
|
130
|
|
131 // check to see if it has already been updated or is
|
|
132 // currently locked; go home if so, otherwise lock it
|
|
133 if (cache.lock (channel, element, msg.time))
|
|
134 try {
|
|
135 // ensure this is the right object
|
|
136 auto loader = cast(IMessageLoader) msg;
|
|
137 if (loader)
|
|
138 {
|
|
139 // acknowledge the request. Do NOT wait for completion!
|
|
140 writer.success.flush;
|
|
141
|
|
142 // get the new cache entry. The 'time' attribute should
|
|
143 // be set appropriately before return
|
|
144 if (auto e = loader.load)
|
|
145 {
|
|
146 long time;
|
|
147 // serialize new entry and stuff it into cache
|
|
148 writer.put (writer.Command.OK, channel, element, e);
|
|
149 cache.put (channel, element, reader.getPacket (cmd, channel, element, time), e.time);
|
|
150 }
|
|
151 }
|
|
152 else
|
|
153 writer.exception (sprint ("invalid remote cache-loader '{}'", msg.toString)).flush;
|
|
154
|
|
155 } finally
|
|
156 // ensure we unlock this one!
|
|
157 cache.unlock (channel, element);
|
|
158 else
|
|
159 writer.success.flush;
|
|
160 }
|
|
161 }
|
|
162
|