annotate tango/tango/net/cluster/NetworkQueue.d @ 132:1700239cab2e trunk

[svn r136] MAJOR UNSTABLE UPDATE!!! Initial commit after moving to Tango instead of Phobos. Lots of bugfixes... This build is not suitable for most things.
author lindquist
date Fri, 11 Jan 2008 17:57:40 +0100
parents
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
132
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
1 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
2
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
3 copyright: Copyright (c) 2004 Kris Bell. All rights reserved
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
4
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
5 license: BSD style: $(LICENSE)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
6
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
7 version: July 2004: Initial release
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
8
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
9 author: Kris
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
10
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
11 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
12
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
13 module tango.net.cluster.NetworkQueue;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
14
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
15 private import tango.net.cluster.NetworkClient;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
16
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
17 /*******************************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
18
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
19 Exposes a gateway to the cluster queues, which collect ICached
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
20 objects until they are removed. Because there is a finite limit
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
21 to the quantity of entries stored, the put() method may throw a
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
22 ClusterFullException if it cannot add a new entry.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
23
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
24 *******************************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
25
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
26 class NetworkQueue : NetworkClient, IConsumer
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
27 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
28 private IChannel reply;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
29 private IConsumer consumer;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
30
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
31 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
32
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
33 Construct a NetworkMessage gateway on the provided QOS cluster
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
34 for the specified channel. Each subsequent queue operation
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
35 will take place over the given channel.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
36
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
37 You can listen for cluster replies by providing an optional
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
38 ChannelListener. Outgoing messages will be tagged appropriately
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
39 such that a consumer can respond using IEvent.reply
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
40
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
41 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
42
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
43 this (ICluster cluster, char[] channel, ChannelListener listener = null)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
44 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
45 super (cluster, channel);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
46
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
47 if (listener)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
48 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
49 reply = cluster.createChannel (channel ~ ".reply");
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
50 consumer = reply.createConsumer (listener);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
51 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
52 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
53
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
54 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
55
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
56 Add an IMessage entry to the corresponding queue. This
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
57 will throw a ClusterFullException if there is no space
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
58 left in the clustered queue.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
59
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
60 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
61
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
62 void put (IMessage message)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
63 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
64 assert (message);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
65
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
66 if (reply)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
67 message.reply = reply.name;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
68
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
69 channel.putQueue (message);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
70 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
71
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
72 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
73
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
74 Query the cluster for queued entries on our corresponding
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
75 channel. Returns, and removes, a matching entry from the
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
76 cluster. This is the synchronous (polling) approach; you
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
77 should use createConsumer() instead for asynchronous style
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
78 notification instead.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
79
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
80 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
81
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
82 IMessage get ()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
83 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
84 return channel.getQueue;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
85 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
86
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
87 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
88
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
89 Cancel the listener. No more events will be dispatched to
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
90 the reply ChannelListener.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
91
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
92 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
93
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
94 void cancel()
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
95 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
96 if (consumer)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
97 consumer.cancel;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
98 consumer = null;
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
99 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
100
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
101 /***********************************************************************
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
102
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
103 Create a listener for this channel. Listeners are invoked
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
104 when new content is placed into a corresponding queue.
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
105
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
106 ***********************************************************************/
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
107
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
108 IConsumer createConsumer (ChannelListener listener)
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
109 {
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
110 return channel.createConsumer (listener);
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
111 }
1700239cab2e [svn r136] MAJOR UNSTABLE UPDATE!!!
lindquist
parents:
diff changeset
112 }