122
|
1 /*******************************************************************************
|
|
2 * Copyright (c) 2003, 2007 IBM Corporation and others.
|
|
3 * All rights reserved. This program and the accompanying materials
|
|
4 * are made available under the terms of the Eclipse Public License v1.0
|
|
5 * which accompanies this distribution, and is available at
|
|
6 * http://www.eclipse.org/legal/epl-v10.html
|
|
7 *
|
|
8 * Contributors:
|
|
9 * IBM - Initial API and implementation
|
|
10 * Port to the D programming language:
|
|
11 * Frank Benoit <benoit@tionex.de>
|
|
12 *******************************************************************************/
|
|
13 module dwtx.core.internal.jobs.WorkerPool;
|
|
14
|
|
15 import tango.core.Thread;
|
|
16 import tango.core.sync.Mutex;
|
|
17 import tango.core.sync.Condition;
|
|
18 import tango.text.convert.Format;
|
|
19 import dwt.dwthelper.utils;
|
|
20
|
|
21 import dwtx.core.runtime.Assert;
|
|
22 import dwtx.core.runtime.IStatus;
|
|
23 import dwtx.core.runtime.jobs.Job;
|
|
24 import dwtx.core.internal.jobs.JobManager;
|
|
25 import dwtx.core.internal.jobs.Worker;
|
|
26
|
|
27 import dwtx.core.internal.jobs.InternalJob;
|
|
28 import dwtx.core.internal.jobs.ThreadJob;
|
|
29
|
|
30 /**
|
|
31 * Maintains a pool of worker threads. Threads are constructed lazily as
|
|
32 * required, and are eventually discarded if not in use for awhile. This class
|
|
33 * maintains the thread creation/destruction policies for the job manager.
|
|
34 *
|
|
35 * Implementation note: all the data structures of this class are protected
|
|
36 * by the instance's object monitor. To avoid deadlock with third party code,
|
|
37 * this lock is never held when calling methods outside this class that may in
|
|
38 * turn use locks.
|
|
39 */
|
|
40 class WorkerPool {
|
|
41
|
|
42 protected Mutex mutex;
|
|
43 protected Condition condition;
|
|
44
|
|
45
|
|
46 /**
|
|
47 * Threads not used by their best before timestamp are destroyed.
|
|
48 */
|
|
49 private static const int BEST_BEFORE = 60000;
|
|
50 /**
|
|
51 * There will always be at least MIN_THREADS workers in the pool.
|
|
52 */
|
|
53 private static const int MIN_THREADS = 1;
|
|
54 /**
|
|
55 * Use the busy thread count to avoid starting new threads when a living
|
|
56 * thread is just doing house cleaning (notifying listeners, etc).
|
|
57 */
|
|
58 private int busyThreads = 0;
|
|
59
|
|
60 /**
|
|
61 * The default context class loader to use when creating worker threads.
|
|
62 */
|
|
63 // protected const ClassLoader defaultContextLoader;
|
|
64
|
|
65 /**
|
|
66 * Records whether new worker threads should be daemon threads.
|
|
67 */
|
|
68 private bool isDaemon = false;
|
|
69
|
|
70 private JobManager manager;
|
|
71 /**
|
|
72 * The number of workers in the threads array
|
|
73 */
|
|
74 private int numThreads = 0;
|
|
75 /**
|
|
76 * The number of threads that are currently sleeping
|
|
77 */
|
|
78 private int sleepingThreads = 0;
|
|
79 /**
|
|
80 * The living set of workers in this pool.
|
|
81 */
|
|
82 private Worker[] threads;
|
|
83
|
|
84 protected package this(JobManager manager) {
|
|
85 threads = new Worker[10];
|
|
86 this.manager = manager;
|
|
87 mutex = new Mutex;
|
|
88 condition = new Condition(mutex);
|
|
89 // this.defaultContextLoader = Thread.currentThread().getContextClassLoader();
|
|
90 }
|
|
91
|
|
92 /**
|
|
93 * Adds a worker to the list of workers.
|
|
94 */
|
|
95 private void add(Worker worker) {
|
|
96 synchronized(mutex){
|
|
97 int size = threads.length;
|
|
98 if (numThreads + 1 > size) {
|
|
99 Worker[] newThreads = new Worker[2 * size];
|
|
100 System.arraycopy(threads, 0, newThreads, 0, size);
|
|
101 threads = newThreads;
|
|
102 }
|
|
103 threads[numThreads++] = worker;
|
|
104 }
|
|
105 }
|
|
106
|
|
107 private void decrementBusyThreads() {
|
|
108 synchronized(mutex){
|
|
109 //impossible to have less than zero busy threads
|
|
110 if (--busyThreads < 0) {
|
|
111 if (JobManager.DEBUG)
|
|
112 Assert.isTrue(false, Integer.toString(busyThreads));
|
|
113 busyThreads = 0;
|
|
114 }
|
|
115 }
|
|
116 }
|
|
117
|
|
118 /**
|
|
119 * Signals the end of a job. Note that this method can be called under
|
|
120 * OutOfMemoryError conditions and thus must be paranoid about allocating objects.
|
|
121 */
|
|
122 protected void endJob(InternalJob job, IStatus result) {
|
|
123 decrementBusyThreads();
|
|
124 //need to end rule in graph before ending job so that 2 threads
|
|
125 //do not become the owners of the same rule in the graph
|
|
126 if ((job.getRule_package() !is null) && !(cast(ThreadJob)job )) {
|
|
127 //remove any locks this thread may be owning on that rule
|
|
128 manager.getLockManager().removeLockCompletely(Thread.getThis(), job.getRule_package());
|
|
129 }
|
|
130 manager.endJob_package(job, result, true);
|
|
131 //ensure this thread no longer owns any scheduling rules
|
|
132 manager.implicitJobs.endJob(job);
|
|
133 }
|
|
134 package void endJob_package(InternalJob job, IStatus result) {
|
|
135 endJob(job, result);
|
|
136 }
|
|
137
|
|
138 /**
|
|
139 * Signals the death of a worker thread. Note that this method can be called under
|
|
140 * OutOfMemoryError conditions and thus must be paranoid about allocating objects.
|
|
141 */
|
|
142 protected void endWorker(Worker worker) {
|
|
143 synchronized(mutex){
|
|
144 if (remove(worker) && JobManager.DEBUG)
|
|
145 JobManager.debug_(Format("worker removed from pool: {}", worker)); //$NON-NLS-1$
|
|
146 }
|
|
147 }
|
|
148 package void endWorker_package(Worker worker) {
|
|
149 endWorker(worker);
|
|
150 }
|
|
151
|
|
152 private void incrementBusyThreads() {
|
|
153 synchronized(mutex){
|
|
154 //impossible to have more busy threads than there are threads
|
|
155 if (++busyThreads > numThreads) {
|
|
156 if (JobManager.DEBUG)
|
|
157 Assert.isTrue(false, Format( "{},{}", busyThreads, numThreads));
|
|
158 busyThreads = numThreads;
|
|
159 }
|
|
160 }
|
|
161 }
|
|
162
|
|
163 /**
|
|
164 * Notification that a job has been added to the queue. Wake a worker,
|
|
165 * creating a new worker if necessary. The provided job may be null.
|
|
166 */
|
|
167 protected package void jobQueued() {
|
|
168 synchronized(mutex){
|
|
169 //if there is a sleeping thread, wake it up
|
|
170 if (sleepingThreads > 0) {
|
|
171 condition.notify();
|
|
172 return;
|
|
173 }
|
|
174 //create a thread if all threads are busy
|
|
175 if (busyThreads >= numThreads) {
|
|
176 Worker worker = new Worker(this);
|
|
177 worker.isDaemon(isDaemon);
|
|
178 add(worker);
|
|
179 if (JobManager.DEBUG)
|
|
180 JobManager.debug_(Format("worker added to pool: {}", worker)); //$NON-NLS-1$
|
|
181 worker.start();
|
|
182 return;
|
|
183 }
|
|
184 }
|
|
185 }
|
|
186
|
|
187 /**
|
|
188 * Remove a worker thread from our list.
|
|
189 * @return true if a worker was removed, and false otherwise.
|
|
190 */
|
|
191 private bool remove(Worker worker) {
|
|
192 synchronized(mutex){
|
|
193 for (int i = 0; i < threads.length; i++) {
|
|
194 if (threads[i] is worker) {
|
|
195 System.arraycopy(threads, i + 1, threads, i, numThreads - i - 1);
|
|
196 threads[--numThreads] = null;
|
|
197 return true;
|
|
198 }
|
|
199 }
|
|
200 return false;
|
|
201 }
|
|
202 }
|
|
203
|
|
204 /**
|
|
205 * Sets whether threads created in the worker pool should be daemon threads.
|
|
206 */
|
|
207 void setDaemon(bool value) {
|
|
208 this.isDaemon = value;
|
|
209 }
|
|
210
|
|
211 protected void shutdown() {
|
|
212 synchronized(mutex){
|
|
213 condition.notifyAll();
|
|
214 }
|
|
215 }
|
|
216 package void shutdown_package() {
|
|
217 shutdown();
|
|
218 }
|
|
219
|
|
220 /**
|
|
221 * Sleep for the given duration or until woken.
|
|
222 */
|
|
223 private void sleep(long duration) {
|
|
224 synchronized(mutex){
|
|
225 sleepingThreads++;
|
|
226 busyThreads--;
|
|
227 if (JobManager.DEBUG)
|
|
228 JobManager.debug_(Format("worker sleeping for: {}ms", duration)); //$NON-NLS-1$ //$NON-NLS-2$
|
|
229 try {
|
|
230 condition.wait(duration/1000.0f);
|
|
231 } catch (InterruptedException e) {
|
|
232 if (JobManager.DEBUG)
|
|
233 JobManager.debug_("worker interrupted while waiting... :-|"); //$NON-NLS-1$
|
|
234 } finally {
|
|
235 sleepingThreads--;
|
|
236 busyThreads++;
|
|
237 }
|
|
238 }
|
|
239 }
|
|
240
|
|
241 /**
|
|
242 * Returns a new job to run. Returns null if the thread should die.
|
|
243 */
|
|
244 protected InternalJob startJob(Worker worker) {
|
|
245 //if we're above capacity, kill the thread
|
|
246 synchronized (mutex) {
|
|
247 if (!manager.isActive_package()) {
|
|
248 //must remove the worker immediately to prevent all threads from expiring
|
|
249 endWorker(worker);
|
|
250 return null;
|
|
251 }
|
|
252 //set the thread to be busy now in case of reentrant scheduling
|
|
253 incrementBusyThreads();
|
|
254 }
|
|
255 Job job = null;
|
|
256 try {
|
|
257 job = manager.startJob_package();
|
|
258 //spin until a job is found or until we have been idle for too long
|
|
259 long idleStart = System.currentTimeMillis();
|
|
260 while (manager.isActive_package() && job is null) {
|
|
261 long hint = manager.sleepHint_package();
|
|
262 if (hint > 0)
|
|
263 sleep(Math.min(hint, BEST_BEFORE));
|
|
264 job = manager.startJob_package();
|
|
265 //if we were already idle, and there are still no new jobs, then
|
|
266 // the thread can expire
|
|
267 synchronized (mutex) {
|
|
268 if (job is null && (System.currentTimeMillis() - idleStart > BEST_BEFORE) && (numThreads - busyThreads) > MIN_THREADS) {
|
|
269 //must remove the worker immediately to prevent all threads from expiring
|
|
270 endWorker(worker);
|
|
271 return null;
|
|
272 }
|
|
273 }
|
|
274 }
|
|
275 if (job !is null) {
|
|
276 //if this job has a rule, then we are essentially acquiring a lock
|
|
277 if ((job.getRule() !is null) && !(cast(ThreadJob)job )) {
|
|
278 //don't need to re-aquire locks because it was not recorded in the graph
|
|
279 //that this thread waited to get this rule
|
|
280 manager.getLockManager().addLockThread(Thread.getThis(), job.getRule());
|
|
281 }
|
|
282 //see if we need to wake another worker
|
|
283 if (manager.sleepHint_package() <= 0)
|
|
284 jobQueued();
|
|
285 }
|
|
286 } finally {
|
|
287 //decrement busy thread count if we're not running a job
|
|
288 if (job is null)
|
|
289 decrementBusyThreads();
|
|
290 }
|
|
291 return job;
|
|
292 }
|
|
293 package InternalJob startJob_package(Worker worker) {
|
|
294 return startJob(worker);
|
|
295 }
|
|
296 }
|