comparison org.eclipse.core.jobs/src/org/eclipse/core/internal/jobs/WorkerPool.d @ 12:bc29606a740c

Added dwt-addons in original directory structure of eclipse.org
author Frank Benoit <benoit@tionex.de>
date Sat, 14 Mar 2009 18:23:29 +0100
parents
children 6f068362a363
comparison
equal deleted inserted replaced
11:43904fec5dca 12:bc29606a740c
1 /*******************************************************************************
2 * Copyright (c) 2003, 2007 IBM Corporation and others.
3 * All rights reserved. This program and the accompanying materials
4 * are made available under the terms of the Eclipse Public License v1.0
5 * which accompanies this distribution, and is available at
6 * http://www.eclipse.org/legal/epl-v10.html
7 *
8 * Contributors:
9 * IBM - Initial API and implementation
10 * Port to the D programming language:
11 * Frank Benoit <benoit@tionex.de>
12 *******************************************************************************/
13 module org.eclipse.core.internal.jobs.WorkerPool;
14
15 import java.lang.JThread;
16 import tango.core.sync.Mutex;
17 import tango.core.sync.Condition;
18 import tango.text.convert.Format;
19 import java.lang.all;
20 import java.util.Set;
21
22 import org.eclipse.core.runtime.Assert;
23 import org.eclipse.core.runtime.IStatus;
24 import org.eclipse.core.runtime.jobs.Job;
25 import org.eclipse.core.internal.jobs.JobManager;
26 import org.eclipse.core.internal.jobs.Worker;
27
28 import org.eclipse.core.internal.jobs.InternalJob;
29 import org.eclipse.core.internal.jobs.ThreadJob;
30
31 /**
32 * Maintains a pool of worker threads. Threads are constructed lazily as
33 * required, and are eventually discarded if not in use for awhile. This class
34 * maintains the thread creation/destruction policies for the job manager.
35 *
36 * Implementation note: all the data structures of this class are protected
37 * by the instance's object monitor. To avoid deadlock with third party code,
38 * this lock is never held when calling methods outside this class that may in
39 * turn use locks.
40 */
41 class WorkerPool {
42
43 protected Mutex mutex;
44 protected Condition condition;
45
46
47 /**
48 * Threads not used by their best before timestamp are destroyed.
49 */
50 private static const int BEST_BEFORE = 60000;
51 /**
52 * There will always be at least MIN_THREADS workers in the pool.
53 */
54 private static const int MIN_THREADS = 1;
55 /**
56 * Use the busy thread count to avoid starting new threads when a living
57 * thread is just doing house cleaning (notifying listeners, etc).
58 */
59 private int busyThreads = 0;
60
61 /**
62 * The default context class loader to use when creating worker threads.
63 */
64 // protected const ClassLoader defaultContextLoader;
65
66 /**
67 * Records whether new worker threads should be daemon threads.
68 */
69 private bool isDaemon = false;
70
71 private JobManager manager;
72 /**
73 * The number of workers in the threads array
74 */
75 private int numThreads = 0;
76 /**
77 * The number of threads that are currently sleeping
78 */
79 private int sleepingThreads = 0;
80 /**
81 * The living set of workers in this pool.
82 */
83 private Worker[] threads;
84
85 protected package this(JobManager manager) {
86 threads = new Worker[10];
87 this.manager = manager;
88 mutex = new Mutex;
89 condition = new Condition(mutex);
90 // this.defaultContextLoader = Thread.currentThread().getContextClassLoader();
91 }
92
93 /**
94 * Adds a worker to the list of workers.
95 */
96 private void add(Worker worker) {
97 synchronized(mutex){
98 int size = threads.length;
99 if (numThreads + 1 > size) {
100 Worker[] newThreads = new Worker[2 * size];
101 System.arraycopy(threads, 0, newThreads, 0, size);
102 threads = newThreads;
103 }
104 threads[numThreads++] = worker;
105 }
106 }
107
108 private void decrementBusyThreads() {
109 synchronized(mutex){
110 //impossible to have less than zero busy threads
111 if (--busyThreads < 0) {
112 if (JobManager.DEBUG)
113 Assert.isTrue(false, Integer.toString(busyThreads));
114 busyThreads = 0;
115 }
116 }
117 }
118
119 /**
120 * Signals the end of a job. Note that this method can be called under
121 * OutOfMemoryError conditions and thus must be paranoid about allocating objects.
122 */
123 protected void endJob(InternalJob job, IStatus result) {
124 decrementBusyThreads();
125 //need to end rule in graph before ending job so that 2 threads
126 //do not become the owners of the same rule in the graph
127 if ((job.getRule_package() !is null) && !(cast(ThreadJob)job )) {
128 //remove any locks this thread may be owning on that rule
129 manager.getLockManager().removeLockCompletely(JThread.currentThread(), job.getRule_package());
130 }
131 manager.endJob_package(job, result, true);
132 //ensure this thread no longer owns any scheduling rules
133 manager.implicitJobs.endJob(job);
134 }
135 package void endJob_package(InternalJob job, IStatus result) {
136 endJob(job, result);
137 }
138
139 /**
140 * Signals the death of a worker thread. Note that this method can be called under
141 * OutOfMemoryError conditions and thus must be paranoid about allocating objects.
142 */
143 protected void endWorker(Worker worker) {
144 synchronized(mutex){
145 if (remove(worker) && JobManager.DEBUG)
146 JobManager.debug_(Format("worker removed from pool: {}", worker)); //$NON-NLS-1$
147 }
148 }
149 package void endWorker_package(Worker worker) {
150 endWorker(worker);
151 }
152
153 private void incrementBusyThreads() {
154 synchronized(mutex){
155 //impossible to have more busy threads than there are threads
156 if (++busyThreads > numThreads) {
157 if (JobManager.DEBUG)
158 Assert.isTrue(false, Format( "{},{}", busyThreads, numThreads));
159 busyThreads = numThreads;
160 }
161 }
162 }
163
164 /**
165 * Notification that a job has been added to the queue. Wake a worker,
166 * creating a new worker if necessary. The provided job may be null.
167 */
168 protected package void jobQueued() {
169 synchronized(mutex){
170 //if there is a sleeping thread, wake it up
171 if (sleepingThreads > 0) {
172 condition.notify();
173 return;
174 }
175 //create a thread if all threads are busy
176 if (busyThreads >= numThreads) {
177 Worker worker = new Worker(this);
178 worker.setDaemon(isDaemon);
179 add(worker);
180 if (JobManager.DEBUG)
181 JobManager.debug_(Format("worker added to pool: {}", worker)); //$NON-NLS-1$
182 worker.start();
183 return;
184 }
185 }
186 }
187
188 /**
189 * Remove a worker thread from our list.
190 * @return true if a worker was removed, and false otherwise.
191 */
192 private bool remove(Worker worker) {
193 synchronized(mutex){
194 for (int i = 0; i < threads.length; i++) {
195 if (threads[i] is worker) {
196 System.arraycopy(threads, i + 1, threads, i, numThreads - i - 1);
197 threads[--numThreads] = null;
198 return true;
199 }
200 }
201 return false;
202 }
203 }
204
205 /**
206 * Sets whether threads created in the worker pool should be daemon threads.
207 */
208 void setDaemon(bool value) {
209 this.isDaemon = value;
210 }
211
212 protected void shutdown() {
213 synchronized(mutex){
214 condition.notifyAll();
215 }
216 }
217 package void shutdown_package() {
218 shutdown();
219 }
220
221 /**
222 * Sleep for the given duration or until woken.
223 */
224 private void sleep(long duration) {
225 synchronized(mutex){
226 sleepingThreads++;
227 busyThreads--;
228 if (JobManager.DEBUG)
229 JobManager.debug_(Format("worker sleeping for: {}ms", duration)); //$NON-NLS-1$ //$NON-NLS-2$
230 try {
231 condition.wait(duration/1000.0f);
232 } catch (InterruptedException e) {
233 if (JobManager.DEBUG)
234 JobManager.debug_("worker interrupted while waiting... :-|"); //$NON-NLS-1$
235 } finally {
236 sleepingThreads--;
237 busyThreads++;
238 }
239 }
240 }
241
242 /**
243 * Returns a new job to run. Returns null if the thread should die.
244 */
245 protected InternalJob startJob(Worker worker) {
246 //if we're above capacity, kill the thread
247 synchronized (mutex) {
248 if (!manager.isActive_package()) {
249 //must remove the worker immediately to prevent all threads from expiring
250 endWorker(worker);
251 return null;
252 }
253 //set the thread to be busy now in case of reentrant scheduling
254 incrementBusyThreads();
255 }
256 Job job = null;
257 try {
258 job = manager.startJob_package();
259 //spin until a job is found or until we have been idle for too long
260 long idleStart = System.currentTimeMillis();
261 while (manager.isActive_package() && job is null) {
262 long hint = manager.sleepHint_package();
263 if (hint > 0)
264 sleep(Math.min(hint, BEST_BEFORE));
265 job = manager.startJob_package();
266 //if we were already idle, and there are still no new jobs, then
267 // the thread can expire
268 synchronized (mutex) {
269 if (job is null && (System.currentTimeMillis() - idleStart > BEST_BEFORE) && (numThreads - busyThreads) > MIN_THREADS) {
270 //must remove the worker immediately to prevent all threads from expiring
271 endWorker(worker);
272 return null;
273 }
274 }
275 }
276 if (job !is null) {
277 //if this job has a rule, then we are essentially acquiring a lock
278 if ((job.getRule() !is null) && !(cast(ThreadJob)job )) {
279 //don't need to re-aquire locks because it was not recorded in the graph
280 //that this thread waited to get this rule
281 manager.getLockManager().addLockThread(JThread.currentThread(), job.getRule());
282 }
283 //see if we need to wake another worker
284 if (manager.sleepHint_package() <= 0)
285 jobQueued();
286 }
287 } finally {
288 //decrement busy thread count if we're not running a job
289 if (job is null)
290 decrementBusyThreads();
291 }
292 return job;
293 }
294 package InternalJob startJob_package(Worker worker) {
295 return startJob(worker);
296 }
297 }