132
|
1 /*******************************************************************************
|
|
2
|
|
3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved
|
|
4
|
|
5 license: BSD style: $(LICENSE)
|
|
6
|
|
7 version: July 2004: Initial release
|
|
8
|
|
9 author: Kris
|
|
10
|
|
11 *******************************************************************************/
|
|
12
|
|
13 module tango.net.cluster.tina.QueueThread;
|
|
14
|
|
15 private import tango.core.Exception;
|
|
16
|
|
17 private import tango.net.cluster.tina.ClusterQueue,
|
|
18 tango.net.cluster.tina.ClusterTypes,
|
|
19 tango.net.cluster.tina.ClusterThread;
|
|
20
|
|
21 /******************************************************************************
|
|
22
|
|
23 Thread for handling queue requests.
|
|
24
|
|
25 ******************************************************************************/
|
|
26
|
|
27 class QueueThread : ClusterThread
|
|
28 {
|
|
29 private ClusterQueue queue;
|
|
30
|
|
31 /**********************************************************************
|
|
32
|
|
33 Note that the conduit stays open until the client kills it
|
|
34
|
|
35 **********************************************************************/
|
|
36
|
|
37 this (AbstractServer server, IConduit conduit, Cluster cluster, ClusterQueue queue)
|
|
38 {
|
|
39 super (server, conduit, cluster);
|
|
40 this.queue = queue;
|
|
41 }
|
|
42
|
|
43 /**********************************************************************
|
|
44
|
|
45 process client requests
|
|
46
|
|
47 **********************************************************************/
|
|
48
|
|
49 void dispatch ()
|
|
50 {
|
|
51 ProtocolWriter.Command cmd;
|
|
52 long time;
|
|
53 char[] channel;
|
|
54 char[] element;
|
|
55
|
|
56 // wait for request to arrive
|
|
57 auto content = reader.getPacket (cmd, channel, element, time);
|
|
58
|
|
59 switch (cmd)
|
|
60 {
|
|
61 case ProtocolWriter.Command.AddQueue:
|
|
62 logger.trace (sprint ("{} add queue entry on channel '{}'", client, channel));
|
|
63
|
|
64 if (queue.put (channel, content))
|
|
65 writer.success;
|
|
66 else
|
|
67 writer.full ("cluster queue is full");
|
|
68 break;
|
|
69
|
|
70 case ProtocolWriter.Command.RemoveQueue:
|
|
71 logger.trace (sprint ("{} remove queue entry on channel '{}'", client, channel));
|
|
72
|
|
73 writer.reply (queue.get (channel));
|
|
74 break;
|
|
75
|
|
76 default:
|
|
77 throw new IllegalArgumentException ("invalid command");
|
|
78 }
|
|
79 }
|
|
80 }
|