comparison lphobos/std/thread.d @ 131:5825d48b27d1 trunk

[svn r135] * Merged DMD 1.025 * * Fixed a minor linking order mishap * * Added an command line option -annotate * * Fixed some problems with running optimizations * * Added std.stdio and dependencies to lphobos (still not 100% working, but compiles and links) * * Fixed problems with passing aggregate types to variadic functions * * Added initial code towards full GC support, currently based on malloc and friends, not all the runtime calls the GC yet for memory * * Fixed problems with resolving nested function context pointers for some heavily nested cases * * Redid function argument passing + other minor code cleanups, still lots to do on this end... *
author lindquist
date Fri, 04 Jan 2008 01:38:42 +0100
parents
children 373489eeaf90
comparison
equal deleted inserted replaced
130:a7dfa0ed966c 131:5825d48b27d1
1 // Written in the D programming language
2
3 /*
4 * Copyright (C) 2002-2007 by Digital Mars, www.digitalmars.com
5 * Written by Walter Bright
6 *
7 * This software is provided 'as-is', without any express or implied
8 * warranty. In no event will the authors be held liable for any damages
9 * arising from the use of this software.
10 *
11 * Permission is granted to anyone to use this software for any purpose,
12 * including commercial applications, and to alter it and redistribute it
13 * freely, subject to the following restrictions:
14 *
15 * o The origin of this software must not be misrepresented; you must not
16 * claim that you wrote the original software. If you use this software
17 * in a product, an acknowledgment in the product documentation would be
18 * appreciated but is not required.
19 * o Altered source versions must be plainly marked as such, and must not
20 * be misrepresented as being the original software.
21 * o This notice may not be removed or altered from any source
22 * distribution.
23 */
24
25 /**************************
26 * The thread module defines the class $(B Thread).
27 *
28 * $(B Thread) is the basis
29 * for writing multithreaded applications. Each thread
30 * has a unique instance of class $(B Thread) associated with it.
31 * It is important to use the $(B Thread) class to create and manage
32 * threads as the garbage collector needs to know about all the threads.
33 * Macros:
34 * WIKI=Phobos/StdThread
35 */
36
37 module std.thread;
38
39 import std.c.stdio;
40
41 //debug=thread;
42
43 /* ================================ Win32 ================================= */
44
45 version (Win32)
46 {
47
48 private import std.c.windows.windows;
49
50 extern (Windows) alias uint (*stdfp)(void *);
51
52 extern (C)
53 thread_hdl _beginthreadex(void* security, uint stack_size,
54 stdfp start_addr, void* arglist, uint initflag,
55 thread_id* thrdaddr);
56
57 /**
58 * The type of the thread handle used by the operating system.
59 * For Windows, it is equivalent to a HANDLE from windows.d.
60 */
61 alias HANDLE thread_hdl;
62
63 alias uint thread_id;
64
65 /**
66 * Thrown for errors.
67 */
68 class ThreadError : Error
69 {
70 this(char[] s)
71 {
72 super("Thread error: " ~ s);
73 }
74 }
75
76 /**
77 * One of these is created for each thread.
78 */
79 class Thread
80 {
81 /**
82 * Constructor used by classes derived from Thread that override main().
83 * The optional stacksize parameter default value of 0 will cause threads
84 * to be created with the default size for the executable - Dave Fladebo
85 */
86 this(size_t stacksize = 0)
87 {
88 this.stacksize = stacksize;
89 }
90
91 /**
92 * Constructor used by classes derived from Thread that override run().
93 */
94 this(int (*fp)(void *), void *arg, size_t stacksize = 0)
95 {
96 this.fp = fp;
97 this.arg = arg;
98 this.stacksize = stacksize;
99 }
100
101 /**
102 * Constructor used by classes derived from Thread that override run().
103 */
104 this(int delegate() dg, size_t stacksize = 0)
105 {
106 this.dg = dg;
107 this.stacksize = stacksize;
108 }
109
110 /**
111 * Destructor
112 *
113 * If the thread hasn't been joined yet, detach it.
114 */
115 ~this()
116 {
117 if (state != TS.FINISHED)
118 CloseHandle(hdl);
119 }
120
121 /**
122 * The handle to this thread assigned by the operating system. This is set
123 * to thread_id.init if the thread hasn't been started yet.
124 */
125 thread_hdl hdl;
126
127 void* stackBottom;
128
129 /**
130 * Create a new thread and start it running. The new thread initializes
131 * itself and then calls run(). start() can only be called once.
132 */
133 void start()
134 {
135 if (state != TS.INITIAL)
136 error("already started");
137
138 synchronized (Thread.classinfo)
139 {
140 for (int i = 0; 1; i++)
141 {
142 if (i == allThreads.length)
143 error("too many threads");
144 if (!allThreads[i])
145 { allThreads[i] = this;
146 idx = i;
147 if (i >= allThreadsDim)
148 allThreadsDim = i + 1;
149 break;
150 }
151 }
152 nthreads++;
153 }
154
155 state = TS.RUNNING;
156 hdl = _beginthreadex(null, cast(uint)stacksize, &threadstart, cast(void*)this, 0, &id);
157 if (hdl == cast(thread_hdl)0)
158 { state = TS.FINISHED;
159 synchronized (Thread.classinfo) allThreads[idx] = null;
160 idx = -1;
161 error("failed to start");
162 }
163 }
164
165 /**
166 * Entry point for a thread. If not overridden, it calls the function
167 * pointer fp and argument arg passed in the constructor, or the delegate
168 * dg.
169 * Returns: the thread exit code, which is normally 0.
170 */
171 int run()
172 {
173 if (fp)
174 return fp(arg);
175 else if (dg)
176 return dg();
177 assert(0);
178 }
179
180 /*****************************
181 * Wait for this thread to terminate.
182 * Simply returns if thread has already terminated.
183 * Throws: $(B ThreadError) if the thread hasn't begun yet or
184 * is called on itself.
185 */
186 void wait()
187 {
188 if (isSelf)
189 error("wait on self");
190 if (state != TS.FINISHED)
191 { DWORD dw;
192
193 dw = WaitForSingleObject(hdl, 0xFFFFFFFF);
194 state = TS.FINISHED;
195 CloseHandle(hdl);
196 hdl = null;
197 }
198 }
199
200 /******************************
201 * Wait for this thread to terminate or until milliseconds time has
202 * elapsed, whichever occurs first.
203 * Simply returns if thread has already terminated.
204 * Throws: $(B ThreadError) if the thread hasn't begun yet or
205 * is called on itself.
206 */
207 void wait(uint milliseconds)
208 {
209 if (isSelf)
210 error("wait on self");
211 if (state != TS.FINISHED)
212 { DWORD dw;
213
214 dw = WaitForSingleObject(hdl, milliseconds);
215 state = TS.FINISHED;
216 CloseHandle(hdl);
217 hdl = null;
218 }
219 }
220
221 /**
222 * The state of a thread.
223 */
224 enum TS
225 {
226 INITIAL, /// The thread hasn't been started yet.
227 RUNNING, /// The thread is running or paused.
228 TERMINATED, /// The thread has ended.
229 FINISHED /// The thread has been cleaned up
230 }
231
232 /**
233 * Returns the state of a thread.
234 */
235 TS getState()
236 {
237 return state;
238 }
239
240 /**
241 * The priority of a thread.
242 */
243 enum PRIORITY
244 {
245 INCREASE, /// Increase thread priority
246 DECREASE, /// Decrease thread priority
247 IDLE, /// Assign thread low priority
248 CRITICAL /// Assign thread high priority
249 }
250
251 /**
252 * Adjust the priority of this thread.
253 * Throws: ThreadError if cannot set priority
254 */
255 void setPriority(PRIORITY p)
256 {
257 int nPriority;
258
259 switch (p)
260 {
261 case PRIORITY.INCREASE:
262 nPriority = THREAD_PRIORITY_ABOVE_NORMAL;
263 break;
264 case PRIORITY.DECREASE:
265 nPriority = THREAD_PRIORITY_BELOW_NORMAL;
266 break;
267 case PRIORITY.IDLE:
268 nPriority = THREAD_PRIORITY_IDLE;
269 break;
270 case PRIORITY.CRITICAL:
271 nPriority = THREAD_PRIORITY_TIME_CRITICAL;
272 break;
273 default:
274 assert(0);
275 }
276
277 if (SetThreadPriority(hdl, nPriority) == THREAD_PRIORITY_ERROR_RETURN)
278 error("set priority");
279 }
280
281 /**
282 * Returns true if this thread is the current thread.
283 */
284 bool isSelf()
285 {
286 //printf("id = %d, self = %d\n", id, pthread_self());
287 return (id == GetCurrentThreadId());
288 }
289
290 /**
291 * Returns a reference to the Thread for the thread that called the
292 * function.
293 */
294 static Thread getThis()
295 {
296 //printf("getThis(), allThreadsDim = %d\n", allThreadsDim);
297 thread_id id = GetCurrentThreadId();
298 for (int i = 0; i < allThreadsDim; i++)
299 {
300 Thread t = allThreads[i];
301 if (t && id == t.id)
302 {
303 return t;
304 }
305 }
306 printf("didn't find it\n");
307 assert(0);
308 }
309
310 /**
311 * Returns an array of all the threads currently running.
312 */
313 static Thread[] getAll()
314 {
315 synchronized (Thread.classinfo) return allThreads[0 .. allThreadsDim];
316 }
317
318 /**
319 * Suspend execution of this thread.
320 */
321 void pause()
322 {
323 if (state != TS.RUNNING || SuspendThread(hdl) == 0xFFFFFFFF)
324 error("cannot pause");
325 }
326
327 /**
328 * Resume execution of this thread.
329 */
330 void resume()
331 {
332 if (state != TS.RUNNING || ResumeThread(hdl) == 0xFFFFFFFF)
333 error("cannot resume");
334 }
335
336 /**
337 * Suspend execution of all threads but this thread.
338 */
339 static void pauseAll()
340 {
341 synchronized (Thread.classinfo)
342 {
343 if (nthreads > 1)
344 {
345 thread_id thisid = GetCurrentThreadId();
346
347 for (int i = 0; i < allThreadsDim; i++)
348 {
349 Thread t = allThreads[i];
350 if (t && t.id != thisid && t.state == TS.RUNNING)
351 t.pause();
352 }
353 }
354 }
355 }
356
357 /**
358 * Resume execution of all paused threads.
359 */
360 static void resumeAll()
361 {
362 synchronized (Thread.classinfo)
363 {
364 if (nthreads > 1)
365 {
366 thread_id thisid = GetCurrentThreadId();
367
368 for (int i = 0; i < allThreadsDim; i++)
369 {
370 Thread t = allThreads[i];
371 if (t && t.id != thisid && t.state == TS.RUNNING)
372 t.resume();
373 }
374 }
375 }
376 }
377
378 /**
379 * Give up the remainder of this thread's time slice.
380 */
381 static void yield()
382 {
383 Sleep(0);
384 }
385
386 /**
387 *
388 */
389 static uint nthreads = 1;
390
391 private:
392
393 static uint allThreadsDim;
394 static Thread[0x400] allThreads; // length matches value in C runtime
395
396 TS state;
397 int idx = -1; // index into allThreads[]
398 thread_id id;
399 size_t stacksize = 0;
400
401 int (*fp)(void *);
402 void *arg;
403
404 int delegate() dg;
405
406 void error(char[] msg)
407 {
408 throw new ThreadError(msg);
409 }
410
411
412 /* ***********************************************
413 * This is just a wrapper to interface between C rtl and Thread.run().
414 */
415
416 extern (Windows) static uint threadstart(void *p)
417 {
418 Thread t = cast(Thread)p;
419 int result;
420
421 debug (thread) printf("Starting thread %d\n", t.idx);
422 t.stackBottom = os_query_stackBottom();
423 try
424 {
425 result = t.run();
426 }
427 catch (Object o)
428 {
429 fprintf(stderr, "Error: %.*s\n", o.toString());
430 result = 1;
431 }
432
433 debug (thread) printf("Ending thread %d\n", t.idx);
434 t.state = TS.TERMINATED;
435 synchronized (Thread.classinfo)
436 {
437 allThreads[t.idx] = null;
438 t.idx = -1;
439 nthreads--;
440 }
441 return result;
442 }
443
444
445 /**************************************
446 * Create a Thread for global main().
447 */
448
449 public static void thread_init()
450 {
451 Thread t = new Thread();
452
453 t.state = TS.RUNNING;
454 t.id = GetCurrentThreadId();
455 t.hdl = Thread.getCurrentThreadHandle();
456 t.stackBottom = os_query_stackBottom();
457
458 assert(!allThreads[0]);
459 allThreads[0] = t;
460 allThreadsDim = 1;
461 t.idx = 0;
462 }
463
464 static ~this()
465 {
466 if (allThreadsDim)
467 {
468 CloseHandle(allThreads[0].hdl);
469 allThreads[0].hdl = GetCurrentThread();
470 }
471 }
472
473 /********************************************
474 * Returns the handle of the current thread.
475 * This is needed because GetCurrentThread() always returns -2 which
476 * is a pseudo-handle representing the current thread.
477 * The returned thread handle is a windows resource and must be explicitly
478 * closed.
479 * Many thanks to Justin (jhenzie@mac.com) for figuring this out
480 * and providing the fix.
481 */
482 static thread_hdl getCurrentThreadHandle()
483 {
484 thread_hdl currentThread = GetCurrentThread();
485 thread_hdl actualThreadHandle;
486
487 //thread_hdl currentProcess = cast(thread_hdl)-1;
488 thread_hdl currentProcess = GetCurrentProcess(); // http://www.digitalmars.com/drn-bin/wwwnews?D/21217
489
490
491 uint access = cast(uint)0x00000002;
492
493 DuplicateHandle(currentProcess, currentThread, currentProcess,
494 &actualThreadHandle, cast(uint)0, TRUE, access);
495
496 return actualThreadHandle;
497 }
498 }
499
500
501 /**********************************************
502 * Determine "bottom" of stack (actually the top on Win32 systems).
503 */
504
505 void *os_query_stackBottom()
506 {
507 asm
508 {
509 naked ;
510 mov EAX,FS:4 ;
511 ret ;
512 }
513 }
514
515 }
516
517 /* ================================ linux ================================= */
518
519 version (linux)
520 {
521
522 private import std.c.linux.linux;
523 private import std.c.linux.linuxextern;
524 private import llvm.intrinsic;
525
526 alias uint pthread_t;
527 extern (C) alias void (*__sighandler_t)(int);
528
529 struct sigset_t
530 {
531 uint __val[1024 / (8 * uint.sizeof)];
532 }
533
534 struct sigaction_t
535 {
536 __sighandler_t sa_handler;
537 sigset_t sa_mask;
538 int sa_flags;
539 void (*sa_restorer)();
540 }
541
542 struct pthread_attr_t
543 {
544 int __detachstate;
545 int __schedpolicy;
546 struct __schedparam
547 {
548 int __sched_priority;
549 }
550 int __inheritsched;
551 int __scope;
552 size_t __guardsize;
553 int __stackaddr_set;
554 void *__stackaddr;
555 size_t __stacksize;
556 }
557
558 unittest
559 {
560 assert(sigset_t.sizeof == 128);
561 assert(sigaction_t.sizeof == 140);
562 assert(sem_t.sizeof == 16);
563 }
564
565 extern (C)
566 {
567 int pthread_create(pthread_t*, void*, void* (*)(void*), void*);
568 int pthread_join(pthread_t, void**);
569 int pthread_kill(pthread_t, int);
570 pthread_t pthread_self();
571 int pthread_equal(pthread_t, pthread_t);
572 int pthread_attr_init(pthread_attr_t*);
573 int pthread_attr_setstacksize(pthread_attr_t *, size_t);
574 int pthread_cancel(pthread_t);
575 int pthread_setcancelstate(int, int*);
576 int pthread_setcanceltype(int, int*);
577 int sched_yield();
578 int sigfillset(sigset_t*);
579 int sigdelset(sigset_t*, int);
580 int sigaction(int, sigaction_t*, sigaction_t*);
581 int sigsuspend(sigset_t*);
582
583 enum
584 {
585 PTHREAD_CANCEL_ENABLE,
586 PTHREAD_CANCEL_DISABLE
587 }
588
589 enum
590 {
591 PTHREAD_CANCEL_DEFERRED,
592 PTHREAD_CANCEL_ASYNCHRONOUS
593 }
594 }
595
596 class ThreadError : Error
597 {
598 this(char[] s)
599 {
600 super("Thread error: " ~ s);
601 }
602 }
603
604 class Thread
605 {
606 // The optional stacksize parameter default value of 0 will cause threads
607 // to be created with the default pthread size - Dave Fladebo
608 this(size_t stacksize = 0)
609 {
610 init(stacksize);
611 }
612
613 this(int (*fp)(void *), void *arg, size_t stacksize = 0)
614 {
615 this.fp = fp;
616 this.arg = arg;
617 init(stacksize);
618 }
619
620 this(int delegate() dg, size_t stacksize = 0)
621 {
622 this.dg = dg;
623 init(stacksize);
624 }
625
626 ~this()
627 {
628 pthread_cond_destroy(&waitCond);
629 pthread_mutex_destroy(&waitMtx);
630 if (state != TS.FINISHED)
631 pthread_detach(id);
632 }
633
634 pthread_t id;
635 void* stackBottom;
636 void* stackTop;
637
638 void start()
639 {
640 if (state != TS.INITIAL)
641 error("already started");
642
643 synchronized (Thread.classinfo)
644 {
645 for (int i = 0; 1; i++)
646 {
647 if (i == allThreads.length)
648 error("too many threads");
649 if (!allThreads[i])
650 { allThreads[i] = this;
651 idx = i;
652 if (i >= allThreadsDim)
653 allThreadsDim = i + 1;
654 break;
655 }
656 }
657 nthreads++;
658 }
659
660 state = TS.RUNNING;
661 //printf("creating thread x%x\n", this);
662 //result = pthread_create(&id, null, &threadstart, this);
663 // Create with thread attributes to allow non-default stack size - Dave Fladebo
664 int result = pthread_create(&id, &threadAttrs, &threadstart, cast(void*)this);
665 if (result)
666 { state = TS.FINISHED;
667 synchronized (Thread.classinfo) allThreads[idx] = null;
668 idx = -1;
669 error("failed to start"); // BUG: should report errno
670 }
671 //printf("t = x%x, id = %d\n", this, id);
672 }
673
674 int run()
675 {
676 if (fp)
677 return fp(arg);
678 else if (dg)
679 return dg();
680 assert(0);
681 }
682
683 void wait()
684 {
685 if (isSelf)
686 error("wait on self");
687
688 if (state != TS.FINISHED)
689 {
690 void *value;
691
692 int result = pthread_join(id, &value);
693 state = TS.FINISHED;
694 if (result)
695 error("failed to wait");
696 }
697 }
698
699 void wait(uint milliseconds)
700 {
701 // Implemented for POSIX systems by Dave Fladebo
702 if (isSelf)
703 error("wait on self");
704 if (state != TS.FINISHED)
705 {
706 timespec ts;
707 timeval tv;
708
709 pthread_mutex_lock(&waitMtx);
710 gettimeofday(&tv, null);
711 ts.tv_sec = cast(__time_t)tv.tv_sec + cast(__time_t)(milliseconds / 1_000);
712 ts.tv_nsec = (tv.tv_usec * 1_000) + ((milliseconds % 1_000) * 1_000_000);
713 if (ts.tv_nsec > 1_000_000_000)
714 {
715 ts.tv_sec += 1;
716 ts.tv_nsec -= 1_000_000_000;
717 }
718 if (pthread_cond_timedwait(&waitCond, &waitMtx, &ts))
719 {
720 int oldstate, oldtype;
721 pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, &oldstate);
722 pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, &oldtype);
723
724 if (pthread_cancel(id)) // thread was not completed in the timeout period, cancel it
725 {
726 pthread_mutex_unlock(&waitMtx);
727 error("cannot terminate thread via timed wait");
728 }
729
730 pthread_setcancelstate(oldstate, null);
731 pthread_setcanceltype(oldtype, null);
732
733 state = TS.TERMINATED;
734 synchronized (Thread.classinfo)
735 {
736 allThreads[idx] = null;
737 idx = -1;
738 nthreads--;
739 }
740
741 pthread_mutex_unlock(&waitMtx);
742 }
743 else
744 {
745 pthread_mutex_unlock(&waitMtx);
746 wait(); // condition has been signalled as complete (see threadstart()), terminate normally
747 }
748 }
749 }
750
751 enum TS
752 {
753 INITIAL, // created
754 RUNNING, // running
755 TERMINATED, // execution finished
756 FINISHED // pthread_join()'ed
757 }
758
759 TS getState()
760 {
761 return state;
762 }
763
764 enum PRIORITY
765 {
766 INCREASE,
767 DECREASE,
768 IDLE,
769 CRITICAL
770 }
771
772 void setPriority(PRIORITY p)
773 {
774 /+ not implemented
775 int nPriority;
776
777 switch (p)
778 {
779 case PRIORITY.INCREASE:
780 nPriority = THREAD_PRIORITY_ABOVE_NORMAL;
781 break;
782 case PRIORITY.DECREASE:
783 nPriority = THREAD_PRIORITY_BELOW_NORMAL;
784 break;
785 case PRIORITY.IDLE:
786 nPriority = THREAD_PRIORITY_IDLE;
787 break;
788 case PRIORITY.CRITICAL:
789 nPriority = THREAD_PRIORITY_TIME_CRITICAL;
790 break;
791 }
792
793 if (SetThreadPriority(hdl, nPriority) == THREAD_PRIORITY_ERROR_RETURN)
794 error("set priority");
795 +/
796 }
797
798 int isSelf()
799 {
800 //printf("id = %d, self = %d\n", id, pthread_self());
801 return pthread_equal(pthread_self(), id);
802 }
803
804 static Thread getThis()
805 {
806 //printf("getThis(), allThreadsDim = %d\n", allThreadsDim);
807 pthread_t id = pthread_self();
808 //printf("id = %d\n", id);
809 for (int i = 0; i < allThreadsDim; i++)
810 {
811 Thread t = allThreads[i];
812 //printf("allThreads[%d] = x%x, id = %d\n", i, t, (t ? t.id : 0));
813 if (t && pthread_equal(id, t.id))
814 {
815 return t;
816 }
817 }
818 printf("didn't find it\n");
819 assert(null);
820 }
821
822 static Thread[] getAll()
823 {
824 synchronized (Thread.classinfo) return allThreads[0 .. allThreadsDim];
825 }
826
827 void pause()
828 {
829 if (state == TS.RUNNING)
830 {
831 if (pthread_kill(id, SIGUSR1))
832 error("cannot pause");
833 else
834 sem_wait(&flagSuspend); // wait for acknowledgement
835 }
836 else
837 error("cannot pause");
838 }
839
840 void resume()
841 {
842 if (state == TS.RUNNING)
843 {
844 if (pthread_kill(id, SIGUSR2))
845 error("cannot resume");
846 }
847 else
848 error("cannot resume");
849 }
850
851 static void pauseAll()
852 {
853 synchronized (Thread.classinfo)
854 {
855 if (nthreads > 1)
856 {
857 pthread_t thisid = pthread_self();
858 int npause = 0;
859
860 for (int i = 0; i < allThreadsDim; i++)
861 {
862 Thread t = allThreads[i];
863 if (t && !pthread_equal(thisid, t.id) && t.state == TS.RUNNING)
864 {
865 if (pthread_kill(t.id, SIGUSR1))
866 t.error("cannot pause");
867 else
868 npause++; // count of paused threads
869 }
870 }
871
872 // Wait for each paused thread to acknowledge
873 while (npause--)
874 {
875 sem_wait(&flagSuspend);
876 }
877 }
878 }
879 }
880
881 static void resumeAll()
882 {
883 synchronized (Thread.classinfo)
884 {
885 if (nthreads > 1)
886 {
887 pthread_t thisid = pthread_self();
888
889 for (int i = 0; i < allThreadsDim; i++)
890 {
891 Thread t = allThreads[i];
892 if (t && t.id != thisid && t.state == TS.RUNNING)
893 t.resume();
894 }
895 }
896 }
897 }
898
899 static void yield()
900 {
901 sched_yield();
902 }
903
904 static uint nthreads = 1;
905
906 private:
907
908 static uint allThreadsDim;
909
910 // Set max to Windows equivalent for compatibility.
911 // pthread_create will fail gracefully if stack limit
912 // is reached prior to allThreads max.
913 static Thread[0x400] allThreads;
914
915 static sem_t flagSuspend;
916
917 TS state;
918 int idx = -1; // index into allThreads[]
919 int flags = 0;
920
921 pthread_attr_t threadAttrs;
922 pthread_mutex_t waitMtx;
923 pthread_cond_t waitCond;
924
925 int (*fp)(void *);
926 void *arg;
927
928 int delegate() dg;
929
930 void error(char[] msg)
931 {
932 throw new ThreadError(msg);
933 }
934
935 void init(size_t stackSize)
936 {
937 // set to default values regardless
938 // passing this as the 2nd arg. for pthread_create()
939 // w/o setting an attribute is equivalent to passing null.
940 pthread_attr_init(&threadAttrs);
941 if (stackSize > 0)
942 {
943 if (pthread_attr_setstacksize(&threadAttrs,stackSize))
944 error("cannot set stack size");
945 }
946
947 if (pthread_mutex_init(&waitMtx, null))
948 error("cannot initialize wait mutex");
949
950 if (pthread_cond_init(&waitCond, null))
951 error("cannot initialize wait condition");
952 }
953
954 /************************************************
955 * This is just a wrapper to interface between C rtl and Thread.run().
956 */
957
958 extern (C) static void *threadstart(void *p)
959 {
960 Thread t = cast(Thread)p;
961 int result;
962
963 debug (thread) printf("Starting thread x%x (%d)\n", t, t.idx);
964
965 // Need to set t.id here, because thread is off and running
966 // before pthread_create() sets it.
967 t.id = pthread_self();
968
969 t.stackBottom = getESP();
970 try
971 {
972 if(t.state == TS.RUNNING)
973 pthread_cond_signal(&t.waitCond); // signal the wait condition (see the timed wait function)
974 result = t.run();
975 }
976 catch (Object o)
977 {
978 fprintf(stderr, "Error: %.*s\n", o.toString());
979 result = 1;
980 }
981
982 debug (thread) printf("Ending thread %d\n", t.idx);
983 t.state = TS.TERMINATED;
984 synchronized (Thread.classinfo)
985 {
986 allThreads[t.idx] = null;
987 t.idx = -1;
988 nthreads--;
989 }
990 return cast(void*)result;
991 }
992
993
994 /**************************************
995 * Create a Thread for global main().
996 */
997
998 public static void thread_init()
999 {
1000 Thread t = new Thread();
1001
1002 t.state = TS.RUNNING;
1003 t.id = pthread_self();
1004
1005 version (none)
1006 {
1007 // See discussion: http://autopackage.org/forums/viewtopic.php?t=22
1008 static void** libc_stack_end;
1009
1010 if (libc_stack_end == libc_stack_end.init)
1011 {
1012 void* handle = dlopen(null, RTLD_NOW);
1013 libc_stack_end = cast(void **)dlsym(handle, "__libc_stack_end");
1014 dlclose(handle);
1015 }
1016 t.stackBottom = *libc_stack_end;
1017 }
1018 else
1019 {
1020 t.stackBottom = cast(void*)__libc_stack_end;
1021 }
1022
1023 assert(!allThreads[0]);
1024 allThreads[0] = t;
1025 allThreadsDim = 1;
1026 t.idx = 0;
1027
1028 /* Install signal handlers so we can suspend/resume threads
1029 */
1030
1031 int result;
1032 sigaction_t sigact;
1033 result = sigfillset(&sigact.sa_mask);
1034 if (result)
1035 goto Lfail;
1036 sigact.sa_handler = &pauseHandler;
1037 result = sigaction(SIGUSR1, &sigact, null);
1038 if (result)
1039 goto Lfail;
1040 sigact.sa_handler = &resumeHandler;
1041 result = sigaction(SIGUSR2, &sigact, null);
1042 if (result)
1043 goto Lfail;
1044
1045 result = sem_init(&flagSuspend, 0, 0);
1046 if (result)
1047 goto Lfail;
1048
1049 return;
1050
1051 Lfail:
1052 t.error("cannot initialize threads");
1053 }
1054
1055 /**********************************
1056 * This gets called when a thread gets SIGUSR1.
1057 */
1058
1059 extern (C) static void pauseHandler(int sig)
1060 { int result;
1061
1062 // Save all registers on the stack so they'll be scanned by the GC
1063 version(none) asm
1064 {
1065 pusha ;
1066 }
1067
1068 assert(sig == SIGUSR1);
1069
1070 sigset_t sigmask;
1071 result = sigfillset(&sigmask);
1072 assert(result == 0);
1073 result = sigdelset(&sigmask, SIGUSR2);
1074 assert(result == 0);
1075
1076 Thread t = getThis();
1077 t.stackTop = getESP();
1078 t.flags &= ~1;
1079 // Release the semaphore _after_ stackTop is set
1080 sem_post(&flagSuspend);
1081 while (1)
1082 {
1083 sigsuspend(&sigmask); // suspend until SIGUSR2
1084 if (t.flags & 1) // ensure it was resumeHandler()
1085 break;
1086 }
1087
1088 // Restore all registers
1089 version(none) asm
1090 {
1091 popa ;
1092 }
1093 }
1094
1095 /**********************************
1096 * This gets called when a thread gets SIGUSR2.
1097 */
1098
1099 extern (C) static void resumeHandler(int sig)
1100 {
1101 Thread t = getThis();
1102
1103 t.flags |= 1;
1104 }
1105
1106 public static void* getESP()
1107 {
1108 version(D_InlineAsm_X86)
1109 {
1110 asm
1111 { naked ;
1112 mov EAX,ESP ;
1113 ret ;
1114 }
1115 }
1116 else
1117 {
1118 void* p = llvm_frameaddress(0);
1119 assert(p !is null);
1120 return p;
1121 }
1122 }
1123 }
1124
1125
1126 }
1127