Mercurial > projects > dwt-addons
comparison dwtx/core/internal/jobs/WorkerPool.d @ 122:9d0585bcb7aa
Add core.jobs package
author | Frank Benoit <benoit@tionex.de> |
---|---|
date | Tue, 12 Aug 2008 02:34:21 +0200 |
parents | |
children | 862b05e0334a |
comparison
equal
deleted
inserted
replaced
121:c0304616ea23 | 122:9d0585bcb7aa |
---|---|
1 /******************************************************************************* | |
2 * Copyright (c) 2003, 2007 IBM Corporation and others. | |
3 * All rights reserved. This program and the accompanying materials | |
4 * are made available under the terms of the Eclipse Public License v1.0 | |
5 * which accompanies this distribution, and is available at | |
6 * http://www.eclipse.org/legal/epl-v10.html | |
7 * | |
8 * Contributors: | |
9 * IBM - Initial API and implementation | |
10 * Port to the D programming language: | |
11 * Frank Benoit <benoit@tionex.de> | |
12 *******************************************************************************/ | |
13 module dwtx.core.internal.jobs.WorkerPool; | |
14 | |
15 import tango.core.Thread; | |
16 import tango.core.sync.Mutex; | |
17 import tango.core.sync.Condition; | |
18 import tango.text.convert.Format; | |
19 import dwt.dwthelper.utils; | |
20 | |
21 import dwtx.core.runtime.Assert; | |
22 import dwtx.core.runtime.IStatus; | |
23 import dwtx.core.runtime.jobs.Job; | |
24 import dwtx.core.internal.jobs.JobManager; | |
25 import dwtx.core.internal.jobs.Worker; | |
26 | |
27 import dwtx.core.internal.jobs.InternalJob; | |
28 import dwtx.core.internal.jobs.ThreadJob; | |
29 | |
30 /** | |
31 * Maintains a pool of worker threads. Threads are constructed lazily as | |
32 * required, and are eventually discarded if not in use for awhile. This class | |
33 * maintains the thread creation/destruction policies for the job manager. | |
34 * | |
35 * Implementation note: all the data structures of this class are protected | |
36 * by the instance's object monitor. To avoid deadlock with third party code, | |
37 * this lock is never held when calling methods outside this class that may in | |
38 * turn use locks. | |
39 */ | |
40 class WorkerPool { | |
41 | |
42 protected Mutex mutex; | |
43 protected Condition condition; | |
44 | |
45 | |
46 /** | |
47 * Threads not used by their best before timestamp are destroyed. | |
48 */ | |
49 private static const int BEST_BEFORE = 60000; | |
50 /** | |
51 * There will always be at least MIN_THREADS workers in the pool. | |
52 */ | |
53 private static const int MIN_THREADS = 1; | |
54 /** | |
55 * Use the busy thread count to avoid starting new threads when a living | |
56 * thread is just doing house cleaning (notifying listeners, etc). | |
57 */ | |
58 private int busyThreads = 0; | |
59 | |
60 /** | |
61 * The default context class loader to use when creating worker threads. | |
62 */ | |
63 // protected const ClassLoader defaultContextLoader; | |
64 | |
65 /** | |
66 * Records whether new worker threads should be daemon threads. | |
67 */ | |
68 private bool isDaemon = false; | |
69 | |
70 private JobManager manager; | |
71 /** | |
72 * The number of workers in the threads array | |
73 */ | |
74 private int numThreads = 0; | |
75 /** | |
76 * The number of threads that are currently sleeping | |
77 */ | |
78 private int sleepingThreads = 0; | |
79 /** | |
80 * The living set of workers in this pool. | |
81 */ | |
82 private Worker[] threads; | |
83 | |
84 protected package this(JobManager manager) { | |
85 threads = new Worker[10]; | |
86 this.manager = manager; | |
87 mutex = new Mutex; | |
88 condition = new Condition(mutex); | |
89 // this.defaultContextLoader = Thread.currentThread().getContextClassLoader(); | |
90 } | |
91 | |
92 /** | |
93 * Adds a worker to the list of workers. | |
94 */ | |
95 private void add(Worker worker) { | |
96 synchronized(mutex){ | |
97 int size = threads.length; | |
98 if (numThreads + 1 > size) { | |
99 Worker[] newThreads = new Worker[2 * size]; | |
100 System.arraycopy(threads, 0, newThreads, 0, size); | |
101 threads = newThreads; | |
102 } | |
103 threads[numThreads++] = worker; | |
104 } | |
105 } | |
106 | |
107 private void decrementBusyThreads() { | |
108 synchronized(mutex){ | |
109 //impossible to have less than zero busy threads | |
110 if (--busyThreads < 0) { | |
111 if (JobManager.DEBUG) | |
112 Assert.isTrue(false, Integer.toString(busyThreads)); | |
113 busyThreads = 0; | |
114 } | |
115 } | |
116 } | |
117 | |
118 /** | |
119 * Signals the end of a job. Note that this method can be called under | |
120 * OutOfMemoryError conditions and thus must be paranoid about allocating objects. | |
121 */ | |
122 protected void endJob(InternalJob job, IStatus result) { | |
123 decrementBusyThreads(); | |
124 //need to end rule in graph before ending job so that 2 threads | |
125 //do not become the owners of the same rule in the graph | |
126 if ((job.getRule_package() !is null) && !(cast(ThreadJob)job )) { | |
127 //remove any locks this thread may be owning on that rule | |
128 manager.getLockManager().removeLockCompletely(Thread.getThis(), job.getRule_package()); | |
129 } | |
130 manager.endJob_package(job, result, true); | |
131 //ensure this thread no longer owns any scheduling rules | |
132 manager.implicitJobs.endJob(job); | |
133 } | |
134 package void endJob_package(InternalJob job, IStatus result) { | |
135 endJob(job, result); | |
136 } | |
137 | |
138 /** | |
139 * Signals the death of a worker thread. Note that this method can be called under | |
140 * OutOfMemoryError conditions and thus must be paranoid about allocating objects. | |
141 */ | |
142 protected void endWorker(Worker worker) { | |
143 synchronized(mutex){ | |
144 if (remove(worker) && JobManager.DEBUG) | |
145 JobManager.debug_(Format("worker removed from pool: {}", worker)); //$NON-NLS-1$ | |
146 } | |
147 } | |
148 package void endWorker_package(Worker worker) { | |
149 endWorker(worker); | |
150 } | |
151 | |
152 private void incrementBusyThreads() { | |
153 synchronized(mutex){ | |
154 //impossible to have more busy threads than there are threads | |
155 if (++busyThreads > numThreads) { | |
156 if (JobManager.DEBUG) | |
157 Assert.isTrue(false, Format( "{},{}", busyThreads, numThreads)); | |
158 busyThreads = numThreads; | |
159 } | |
160 } | |
161 } | |
162 | |
163 /** | |
164 * Notification that a job has been added to the queue. Wake a worker, | |
165 * creating a new worker if necessary. The provided job may be null. | |
166 */ | |
167 protected package void jobQueued() { | |
168 synchronized(mutex){ | |
169 //if there is a sleeping thread, wake it up | |
170 if (sleepingThreads > 0) { | |
171 condition.notify(); | |
172 return; | |
173 } | |
174 //create a thread if all threads are busy | |
175 if (busyThreads >= numThreads) { | |
176 Worker worker = new Worker(this); | |
177 worker.isDaemon(isDaemon); | |
178 add(worker); | |
179 if (JobManager.DEBUG) | |
180 JobManager.debug_(Format("worker added to pool: {}", worker)); //$NON-NLS-1$ | |
181 worker.start(); | |
182 return; | |
183 } | |
184 } | |
185 } | |
186 | |
187 /** | |
188 * Remove a worker thread from our list. | |
189 * @return true if a worker was removed, and false otherwise. | |
190 */ | |
191 private bool remove(Worker worker) { | |
192 synchronized(mutex){ | |
193 for (int i = 0; i < threads.length; i++) { | |
194 if (threads[i] is worker) { | |
195 System.arraycopy(threads, i + 1, threads, i, numThreads - i - 1); | |
196 threads[--numThreads] = null; | |
197 return true; | |
198 } | |
199 } | |
200 return false; | |
201 } | |
202 } | |
203 | |
204 /** | |
205 * Sets whether threads created in the worker pool should be daemon threads. | |
206 */ | |
207 void setDaemon(bool value) { | |
208 this.isDaemon = value; | |
209 } | |
210 | |
211 protected void shutdown() { | |
212 synchronized(mutex){ | |
213 condition.notifyAll(); | |
214 } | |
215 } | |
216 package void shutdown_package() { | |
217 shutdown(); | |
218 } | |
219 | |
220 /** | |
221 * Sleep for the given duration or until woken. | |
222 */ | |
223 private void sleep(long duration) { | |
224 synchronized(mutex){ | |
225 sleepingThreads++; | |
226 busyThreads--; | |
227 if (JobManager.DEBUG) | |
228 JobManager.debug_(Format("worker sleeping for: {}ms", duration)); //$NON-NLS-1$ //$NON-NLS-2$ | |
229 try { | |
230 condition.wait(duration/1000.0f); | |
231 } catch (InterruptedException e) { | |
232 if (JobManager.DEBUG) | |
233 JobManager.debug_("worker interrupted while waiting... :-|"); //$NON-NLS-1$ | |
234 } finally { | |
235 sleepingThreads--; | |
236 busyThreads++; | |
237 } | |
238 } | |
239 } | |
240 | |
241 /** | |
242 * Returns a new job to run. Returns null if the thread should die. | |
243 */ | |
244 protected InternalJob startJob(Worker worker) { | |
245 //if we're above capacity, kill the thread | |
246 synchronized (mutex) { | |
247 if (!manager.isActive_package()) { | |
248 //must remove the worker immediately to prevent all threads from expiring | |
249 endWorker(worker); | |
250 return null; | |
251 } | |
252 //set the thread to be busy now in case of reentrant scheduling | |
253 incrementBusyThreads(); | |
254 } | |
255 Job job = null; | |
256 try { | |
257 job = manager.startJob_package(); | |
258 //spin until a job is found or until we have been idle for too long | |
259 long idleStart = System.currentTimeMillis(); | |
260 while (manager.isActive_package() && job is null) { | |
261 long hint = manager.sleepHint_package(); | |
262 if (hint > 0) | |
263 sleep(Math.min(hint, BEST_BEFORE)); | |
264 job = manager.startJob_package(); | |
265 //if we were already idle, and there are still no new jobs, then | |
266 // the thread can expire | |
267 synchronized (mutex) { | |
268 if (job is null && (System.currentTimeMillis() - idleStart > BEST_BEFORE) && (numThreads - busyThreads) > MIN_THREADS) { | |
269 //must remove the worker immediately to prevent all threads from expiring | |
270 endWorker(worker); | |
271 return null; | |
272 } | |
273 } | |
274 } | |
275 if (job !is null) { | |
276 //if this job has a rule, then we are essentially acquiring a lock | |
277 if ((job.getRule() !is null) && !(cast(ThreadJob)job )) { | |
278 //don't need to re-aquire locks because it was not recorded in the graph | |
279 //that this thread waited to get this rule | |
280 manager.getLockManager().addLockThread(Thread.getThis(), job.getRule()); | |
281 } | |
282 //see if we need to wake another worker | |
283 if (manager.sleepHint_package() <= 0) | |
284 jobQueued(); | |
285 } | |
286 } finally { | |
287 //decrement busy thread count if we're not running a job | |
288 if (job is null) | |
289 decrementBusyThreads(); | |
290 } | |
291 return job; | |
292 } | |
293 package InternalJob startJob_package(Worker worker) { | |
294 return startJob(worker); | |
295 } | |
296 } |