blob: bbce5053d28f2b1723a100b2c04231d26531b58a [file] [log] [blame]
Tor Norbye3a2425a52013-11-04 10:16:08 -08001from java.lang import InterruptedException
2from java.util import Collections, WeakHashMap
3from java.util.concurrent import Semaphore, CyclicBarrier
4from java.util.concurrent.locks import ReentrantLock
5from org.python.util import jython
6from org.python.core import Py
7from thread import _newFunctionThread
8from thread import _local as local
9from _threading import Lock, RLock, Condition, _Lock, _RLock, _threads, _active, _jthread_to_pythread, _register_thread, _unregister_thread
10import java.lang.Thread
11import sys as _sys
12from traceback import print_exc as _print_exc
13
14# Rename some stuff so "from threading import *" is safe
15__all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',
16 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
17 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
18
19_VERBOSE = False
20
21if __debug__:
22
23 class _Verbose(object):
24
25 def __init__(self, verbose=None):
26 if verbose is None:
27 verbose = _VERBOSE
28 self.__verbose = verbose
29
30 def _note(self, format, *args):
31 if self.__verbose:
32 format = format % args
33 format = "%s: %s\n" % (
34 currentThread().getName(), format)
35 _sys.stderr.write(format)
36
37else:
38 # Disable this when using "python -O"
39 class _Verbose(object):
40 def __init__(self, verbose=None):
41 pass
42 def _note(self, *args):
43 pass
44
45# Support for profile and trace hooks
46
47_profile_hook = None
48_trace_hook = None
49
50def setprofile(func):
51 global _profile_hook
52 _profile_hook = func
53
54def settrace(func):
55 global _trace_hook
56 _trace_hook = func
57
58
59class Semaphore(object):
60 def __init__(self, value=1):
61 if value < 0:
62 raise ValueError("Semaphore initial value must be >= 0")
63 self._semaphore = java.util.concurrent.Semaphore(value)
64
65 def acquire(self, blocking=True):
66 if blocking:
67 self._semaphore.acquire()
68 return True
69 else:
70 return self._semaphore.tryAcquire()
71
72 def __enter__(self):
73 self.acquire()
74 return self
75
76 def release(self):
77 self._semaphore.release()
78
79 def __exit__(self, t, v, tb):
80 self.release()
81
82
83ThreadStates = {
84 java.lang.Thread.State.NEW : 'initial',
85 java.lang.Thread.State.RUNNABLE: 'started',
86 java.lang.Thread.State.BLOCKED: 'started',
87 java.lang.Thread.State.WAITING: 'started',
88 java.lang.Thread.State.TIMED_WAITING: 'started',
89 java.lang.Thread.State.TERMINATED: 'stopped',
90}
91
92class JavaThread(object):
93 def __init__(self, thread):
94 self._thread = thread
95 _register_thread(thread, self)
96
97 def __repr__(self):
98 _thread = self._thread
99 status = ThreadStates[_thread.getState()]
100 if _thread.isDaemon(): status + " daemon"
101 return "<%s(%s, %s)>" % (self.__class__.__name__, self.getName(), status)
102
103 def __eq__(self, other):
104 if isinstance(other, JavaThread):
105 return self._thread == other._thread
106 else:
107 return False
108
109 def __ne__(self, other):
110 return not self.__eq__(other)
111
112 def start(self):
113 self._thread.start()
114
115 def run(self):
116 self._thread.run()
117
118 def join(self, timeout=None):
119 if timeout:
120 millis = timeout * 1000.
121 millis_int = int(millis)
122 nanos = int((millis - millis_int) * 1e6)
123 self._thread.join(millis_int, nanos)
124 else:
125 self._thread.join()
126
127 def getName(self):
128 return self._thread.getName()
129
130 def setName(self, name):
131 self._thread.setName(str(name))
132
133 def isAlive(self):
134 return self._thread.isAlive()
135
136 def isDaemon(self):
137 return self._thread.isDaemon()
138
139 def setDaemon(self, daemonic):
140 self._thread.setDaemon(bool(daemonic))
141
142 def __tojava__(self, c):
143 if isinstance(self._thread, c):
144 return self._thread
145 if isinstance(self, c):
146 return self
147 return Py.NoConversion
148
149
150class Thread(JavaThread):
151 def __init__(self, group=None, target=None, name=None, args=None, kwargs=None):
152 assert group is None, "group argument must be None for now"
153 _thread = self._create_thread()
154 JavaThread.__init__(self, _thread)
155 if args is None:
156 args = ()
157 if kwargs is None:
158 kwargs = {}
159 self._target = target
160 self._args = args
161 self._kwargs = kwargs
162 if name:
163 self._thread.setName(str(name))
164
165 def _create_thread(self):
166 return _newFunctionThread(self.__bootstrap, ())
167
168 def run(self):
169 if self._target:
170 self._target(*self._args, **self._kwargs)
171
172 def __bootstrap(self):
173 try:
174 if _trace_hook:
175 _sys.settrace(_trace_hook)
176 if _profile_hook:
177 _sys.setprofile(_profile_hook)
178 try:
179 self.run()
180 except SystemExit:
181 pass
182 except InterruptedException:
183 # Quiet InterruptedExceptions if they're caused by
184 # _systemrestart
185 if not jython.shouldRestart:
186 raise
187 except:
188 # If sys.stderr is no more (most likely from interpreter
189 # shutdown) use self.__stderr. Otherwise still use sys (as in
190 # _sys) in case sys.stderr was redefined.
191 if _sys:
192 _sys.stderr.write("Exception in thread %s:" %
193 self.getName())
194 _print_exc(file=_sys.stderr)
195 else:
196 # Do the best job possible w/o a huge amt. of code to
197 # approx. a traceback stack trace
198 exc_type, exc_value, exc_tb = self.__exc_info()
199 try:
200 print>>self.__stderr, (
201 "Exception in thread " + self.getName() +
202 " (most likely raised during interpreter shutdown):")
203 print>>self.__stderr, (
204 "Traceback (most recent call last):")
205 while exc_tb:
206 print>>self.__stderr, (
207 ' File "%s", line %s, in %s' %
208 (exc_tb.tb_frame.f_code.co_filename,
209 exc_tb.tb_lineno,
210 exc_tb.tb_frame.f_code.co_name))
211 exc_tb = exc_tb.tb_next
212 print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
213 # Make sure that exc_tb gets deleted since it is a memory
214 # hog; deleting everything else is just for thoroughness
215 finally:
216 del exc_type, exc_value, exc_tb
217
218 finally:
219 self.__stop()
220 try:
221 self.__delete()
222 except:
223 pass
224
225 def __stop(self):
226 pass
227
228 def __delete(self):
229 _unregister_thread(self._thread)
230
231
232class _MainThread(Thread):
233 def __init__(self):
234 Thread.__init__(self, name="MainThread")
235 import atexit
236 atexit.register(self.__exitfunc)
237
238 def _create_thread(self):
239 return java.lang.Thread.currentThread()
240
241 def _set_daemon(self):
242 return False
243
244 def __exitfunc(self):
245 _unregister_thread(self._thread)
246 t = _pickSomeNonDaemonThread()
247 while t:
248 t.join()
249 t = _pickSomeNonDaemonThread()
250
251def _pickSomeNonDaemonThread():
252 for t in enumerate():
253 if not t.isDaemon() and t.isAlive():
254 return t
255 return None
256
257def currentThread():
258 jthread = java.lang.Thread.currentThread()
259 pythread = _jthread_to_pythread[jthread]
260 if pythread is None:
261 pythread = JavaThread(jthread)
262 return pythread
263
264def activeCount():
265 return len(_threads)
266
267def enumerate():
268 return _threads.values()
269
270from thread import stack_size
271
272
273_MainThread()
274
275
276######################################################################
277# pure Python code from CPythonLib/threading.py
278
279# The timer class was contributed by Itamar Shtull-Trauring
280
281def Timer(*args, **kwargs):
282 return _Timer(*args, **kwargs)
283
284class _Timer(Thread):
285 """Call a function after a specified number of seconds:
286
287 t = Timer(30.0, f, args=[], kwargs={})
288 t.start()
289 t.cancel() # stop the timer's action if it's still waiting
290 """
291
292 def __init__(self, interval, function, args=[], kwargs={}):
293 Thread.__init__(self)
294 self.interval = interval
295 self.function = function
296 self.args = args
297 self.kwargs = kwargs
298 self.finished = Event()
299
300 def cancel(self):
301 """Stop the timer if it hasn't finished yet"""
302 self.finished.set()
303
304 def run(self):
305 self.finished.wait(self.interval)
306 if not self.finished.isSet():
307 self.function(*self.args, **self.kwargs)
308 self.finished.set()
309
310
311# NOT USED except by BoundedSemaphore
312class _Semaphore(_Verbose):
313
314 # After Tim Peters' semaphore class, but not quite the same (no maximum)
315
316 def __init__(self, value=1, verbose=None):
317 assert value >= 0, "Semaphore initial value must be >= 0"
318 _Verbose.__init__(self, verbose)
319 self.__cond = Condition(Lock())
320 self.__value = value
321
322 def acquire(self, blocking=1):
323 rc = False
324 self.__cond.acquire()
325 while self.__value == 0:
326 if not blocking:
327 break
328 if __debug__:
329 self._note("%s.acquire(%s): blocked waiting, value=%s",
330 self, blocking, self.__value)
331 self.__cond.wait()
332 else:
333 self.__value = self.__value - 1
334 if __debug__:
335 self._note("%s.acquire: success, value=%s",
336 self, self.__value)
337 rc = True
338 self.__cond.release()
339 return rc
340
341 def release(self):
342 self.__cond.acquire()
343 self.__value = self.__value + 1
344 if __debug__:
345 self._note("%s.release: success, value=%s",
346 self, self.__value)
347 self.__cond.notify()
348 self.__cond.release()
349
350
351def BoundedSemaphore(*args, **kwargs):
352 return _BoundedSemaphore(*args, **kwargs)
353
354class _BoundedSemaphore(_Semaphore):
355 """Semaphore that checks that # releases is <= # acquires"""
356 def __init__(self, value=1, verbose=None):
357 _Semaphore.__init__(self, value, verbose)
358 self._initial_value = value
359
360 def __enter__(self):
361 self.acquire()
362 return self
363
364 def release(self):
365 if self._Semaphore__value >= self._initial_value:
366 raise ValueError, "Semaphore released too many times"
367 return _Semaphore.release(self)
368
369 def __exit__(self, t, v, tb):
370 self.release()
371
372
373def Event(*args, **kwargs):
374 return _Event(*args, **kwargs)
375
376class _Event(_Verbose):
377
378 # After Tim Peters' event class (without is_posted())
379
380 def __init__(self, verbose=None):
381 _Verbose.__init__(self, verbose)
382 self.__cond = Condition(Lock())
383 self.__flag = False
384
385 def isSet(self):
386 return self.__flag
387
388 def set(self):
389 self.__cond.acquire()
390 try:
391 self.__flag = True
392 self.__cond.notifyAll()
393 finally:
394 self.__cond.release()
395
396 def clear(self):
397 self.__cond.acquire()
398 try:
399 self.__flag = False
400 finally:
401 self.__cond.release()
402
403 def wait(self, timeout=None):
404 self.__cond.acquire()
405 try:
406 if not self.__flag:
407 self.__cond.wait(timeout)
408 finally:
409 self.__cond.release()