| import os |
| import subprocess |
| import sys |
| import threading |
| import functools |
| import contextlib |
| import logging |
| import re |
| import time |
| import gc |
| import traceback |
| from StringIO import StringIO |
| from test import test_support |
| |
| from concurrent import futures |
| from concurrent.futures._base import ( |
| PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) |
| from concurrent.futures.thread import cpu_count |
| |
| try: |
| import unittest2 as unittest |
| except ImportError: |
| import unittest |
| |
| |
| def reap_threads(func): |
| """Use this function when threads are being used. This will |
| ensure that the threads are cleaned up even when the test fails. |
| If threading is unavailable this function does nothing. |
| """ |
| @functools.wraps(func) |
| def decorator(*args): |
| key = test_support.threading_setup() |
| try: |
| return func(*args) |
| finally: |
| test_support.threading_cleanup(*key) |
| return decorator |
| |
| |
| # Executing the interpreter in a subprocess |
| def _assert_python(expected_success, *args, **env_vars): |
| cmd_line = [sys.executable] |
| if not env_vars: |
| cmd_line.append('-E') |
| # Need to preserve the original environment, for in-place testing of |
| # shared library builds. |
| env = os.environ.copy() |
| # But a special flag that can be set to override -- in this case, the |
| # caller is responsible to pass the full environment. |
| if env_vars.pop('__cleanenv', None): |
| env = {} |
| env.update(env_vars) |
| cmd_line.extend(args) |
| p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE, |
| stdout=subprocess.PIPE, stderr=subprocess.PIPE, |
| env=env) |
| try: |
| out, err = p.communicate() |
| finally: |
| subprocess._cleanup() |
| p.stdout.close() |
| p.stderr.close() |
| rc = p.returncode |
| err = strip_python_stderr(err) |
| if (rc and expected_success) or (not rc and not expected_success): |
| raise AssertionError( |
| "Process return code is %d, " |
| "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore'))) |
| return rc, out, err |
| |
| |
| def assert_python_ok(*args, **env_vars): |
| """ |
| Assert that running the interpreter with `args` and optional environment |
| variables `env_vars` is ok and return a (return code, stdout, stderr) tuple. |
| """ |
| return _assert_python(True, *args, **env_vars) |
| |
| |
| def strip_python_stderr(stderr): |
| """Strip the stderr of a Python process from potential debug output |
| emitted by the interpreter. |
| |
| This will typically be run on the result of the communicate() method |
| of a subprocess.Popen object. |
| """ |
| stderr = re.sub(r"\[\d+ refs\]\r?\n?$".encode(), "".encode(), stderr).strip() |
| return stderr |
| |
| |
| @contextlib.contextmanager |
| def captured_stderr(): |
| """Return a context manager used by captured_stdout/stdin/stderr |
| that temporarily replaces the sys stream *stream_name* with a StringIO.""" |
| logging_stream = StringIO() |
| handler = logging.StreamHandler(logging_stream) |
| logging.root.addHandler(handler) |
| |
| try: |
| yield logging_stream |
| finally: |
| logging.root.removeHandler(handler) |
| |
| |
| def create_future(state=PENDING, exception=None, result=None): |
| f = Future() |
| f._state = state |
| f._exception = exception |
| f._result = result |
| return f |
| |
| |
| PENDING_FUTURE = create_future(state=PENDING) |
| RUNNING_FUTURE = create_future(state=RUNNING) |
| CANCELLED_FUTURE = create_future(state=CANCELLED) |
| CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) |
| EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError()) |
| SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) |
| |
| |
| def mul(x, y): |
| return x * y |
| |
| |
| def sleep_and_raise(t): |
| time.sleep(t) |
| raise Exception('this is an exception') |
| |
| def sleep_and_print(t, msg): |
| time.sleep(t) |
| print(msg) |
| sys.stdout.flush() |
| |
| |
| class ExecutorMixin: |
| worker_count = 5 |
| |
| def setUp(self): |
| self.t1 = time.time() |
| try: |
| self.executor = self.executor_type(max_workers=self.worker_count) |
| except NotImplementedError: |
| e = sys.exc_info()[1] |
| self.skipTest(str(e)) |
| self._prime_executor() |
| |
| def tearDown(self): |
| self.executor.shutdown(wait=True) |
| dt = time.time() - self.t1 |
| if test_support.verbose: |
| print("%.2fs" % dt) |
| self.assertLess(dt, 60, "synchronization issue: test lasted too long") |
| |
| def _prime_executor(self): |
| # Make sure that the executor is ready to do work before running the |
| # tests. This should reduce the probability of timeouts in the tests. |
| futures = [self.executor.submit(time.sleep, 0.1) |
| for _ in range(self.worker_count)] |
| |
| for f in futures: |
| f.result() |
| |
| |
| class ThreadPoolMixin(ExecutorMixin): |
| executor_type = futures.ThreadPoolExecutor |
| |
| |
| class ProcessPoolMixin(ExecutorMixin): |
| executor_type = futures.ProcessPoolExecutor |
| |
| |
| class ExecutorShutdownTest(unittest.TestCase): |
| def test_run_after_shutdown(self): |
| self.executor.shutdown() |
| self.assertRaises(RuntimeError, |
| self.executor.submit, |
| pow, 2, 5) |
| |
| def test_interpreter_shutdown(self): |
| # Test the atexit hook for shutdown of worker threads and processes |
| rc, out, err = assert_python_ok('-c', """if 1: |
| from concurrent.futures import %s |
| from time import sleep |
| from test_futures import sleep_and_print |
| t = %s(5) |
| t.submit(sleep_and_print, 1.0, "apple") |
| """ % (self.executor_type.__name__, self.executor_type.__name__)) |
| # Errors in atexit hooks don't change the process exit code, check |
| # stderr manually. |
| self.assertFalse(err) |
| self.assertEqual(out.strip(), "apple".encode()) |
| |
| def test_hang_issue12364(self): |
| fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] |
| self.executor.shutdown() |
| for f in fs: |
| f.result() |
| |
| |
| class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest): |
| def _prime_executor(self): |
| pass |
| |
| def test_threads_terminate(self): |
| self.executor.submit(mul, 21, 2) |
| self.executor.submit(mul, 6, 7) |
| self.executor.submit(mul, 3, 14) |
| self.assertEqual(len(self.executor._threads), 3) |
| self.executor.shutdown() |
| for t in self.executor._threads: |
| t.join() |
| |
| def test_context_manager_shutdown(self): |
| with futures.ThreadPoolExecutor(max_workers=5) as e: |
| executor = e |
| self.assertEqual(list(e.map(abs, range(-5, 5))), |
| [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) |
| |
| for t in executor._threads: |
| t.join() |
| |
| def test_del_shutdown(self): |
| executor = futures.ThreadPoolExecutor(max_workers=5) |
| executor.map(abs, range(-5, 5)) |
| threads = executor._threads |
| del executor |
| gc.collect() |
| |
| for t in threads: |
| t.join() |
| |
| def test_thread_names_assigned(self): |
| executor = futures.ThreadPoolExecutor( |
| max_workers=5, thread_name_prefix='SpecialPool') |
| executor.map(abs, range(-5, 5)) |
| threads = executor._threads |
| del executor |
| gc.collect() |
| |
| for t in threads: |
| self.assertRegexpMatches(t.name, r'^SpecialPool_[0-4]$') |
| t.join() |
| |
| def test_thread_names_default(self): |
| executor = futures.ThreadPoolExecutor(max_workers=5) |
| executor.map(abs, range(-5, 5)) |
| threads = executor._threads |
| del executor |
| gc.collect() |
| |
| for t in threads: |
| # Ensure that our default name is reasonably sane and unique when |
| # no thread_name_prefix was supplied. |
| self.assertRegexpMatches(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') |
| t.join() |
| |
| |
| class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest): |
| def _prime_executor(self): |
| pass |
| |
| def test_processes_terminate(self): |
| self.executor.submit(mul, 21, 2) |
| self.executor.submit(mul, 6, 7) |
| self.executor.submit(mul, 3, 14) |
| self.assertEqual(len(self.executor._processes), 5) |
| processes = self.executor._processes |
| self.executor.shutdown() |
| |
| for p in processes: |
| p.join() |
| |
| def test_context_manager_shutdown(self): |
| with futures.ProcessPoolExecutor(max_workers=5) as e: |
| processes = e._processes |
| self.assertEqual(list(e.map(abs, range(-5, 5))), |
| [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) |
| |
| for p in processes: |
| p.join() |
| |
| def test_del_shutdown(self): |
| executor = futures.ProcessPoolExecutor(max_workers=5) |
| list(executor.map(abs, range(-5, 5))) |
| queue_management_thread = executor._queue_management_thread |
| processes = executor._processes |
| del executor |
| gc.collect() |
| |
| queue_management_thread.join() |
| for p in processes: |
| p.join() |
| |
| |
| class WaitTests(unittest.TestCase): |
| |
| def test_first_completed(self): |
| future1 = self.executor.submit(mul, 21, 2) |
| future2 = self.executor.submit(time.sleep, 1.5) |
| |
| done, not_done = futures.wait( |
| [CANCELLED_FUTURE, future1, future2], |
| return_when=futures.FIRST_COMPLETED) |
| |
| self.assertEqual(set([future1]), done) |
| self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) |
| |
| def test_first_completed_some_already_completed(self): |
| future1 = self.executor.submit(time.sleep, 1.5) |
| |
| finished, pending = futures.wait( |
| [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], |
| return_when=futures.FIRST_COMPLETED) |
| |
| self.assertEqual( |
| set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), |
| finished) |
| self.assertEqual(set([future1]), pending) |
| |
| def test_first_exception(self): |
| future1 = self.executor.submit(mul, 2, 21) |
| future2 = self.executor.submit(sleep_and_raise, 1.5) |
| future3 = self.executor.submit(time.sleep, 3) |
| |
| finished, pending = futures.wait( |
| [future1, future2, future3], |
| return_when=futures.FIRST_EXCEPTION) |
| |
| self.assertEqual(set([future1, future2]), finished) |
| self.assertEqual(set([future3]), pending) |
| |
| def test_first_exception_some_already_complete(self): |
| future1 = self.executor.submit(divmod, 21, 0) |
| future2 = self.executor.submit(time.sleep, 1.5) |
| |
| finished, pending = futures.wait( |
| [SUCCESSFUL_FUTURE, |
| CANCELLED_FUTURE, |
| CANCELLED_AND_NOTIFIED_FUTURE, |
| future1, future2], |
| return_when=futures.FIRST_EXCEPTION) |
| |
| self.assertEqual(set([SUCCESSFUL_FUTURE, |
| CANCELLED_AND_NOTIFIED_FUTURE, |
| future1]), finished) |
| self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) |
| |
| def test_first_exception_one_already_failed(self): |
| future1 = self.executor.submit(time.sleep, 2) |
| |
| finished, pending = futures.wait( |
| [EXCEPTION_FUTURE, future1], |
| return_when=futures.FIRST_EXCEPTION) |
| |
| self.assertEqual(set([EXCEPTION_FUTURE]), finished) |
| self.assertEqual(set([future1]), pending) |
| |
| def test_all_completed(self): |
| future1 = self.executor.submit(divmod, 2, 0) |
| future2 = self.executor.submit(mul, 2, 21) |
| |
| finished, pending = futures.wait( |
| [SUCCESSFUL_FUTURE, |
| CANCELLED_AND_NOTIFIED_FUTURE, |
| EXCEPTION_FUTURE, |
| future1, |
| future2], |
| return_when=futures.ALL_COMPLETED) |
| |
| self.assertEqual(set([SUCCESSFUL_FUTURE, |
| CANCELLED_AND_NOTIFIED_FUTURE, |
| EXCEPTION_FUTURE, |
| future1, |
| future2]), finished) |
| self.assertEqual(set(), pending) |
| |
| def test_timeout(self): |
| future1 = self.executor.submit(mul, 6, 7) |
| future2 = self.executor.submit(time.sleep, 3) |
| |
| finished, pending = futures.wait( |
| [CANCELLED_AND_NOTIFIED_FUTURE, |
| EXCEPTION_FUTURE, |
| SUCCESSFUL_FUTURE, |
| future1, future2], |
| timeout=1.5, |
| return_when=futures.ALL_COMPLETED) |
| |
| self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, |
| EXCEPTION_FUTURE, |
| SUCCESSFUL_FUTURE, |
| future1]), finished) |
| self.assertEqual(set([future2]), pending) |
| |
| |
| class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests): |
| |
| def test_pending_calls_race(self): |
| # Issue #14406: multi-threaded race condition when waiting on all |
| # futures. |
| event = threading.Event() |
| def future_func(): |
| event.wait() |
| oldswitchinterval = sys.getcheckinterval() |
| sys.setcheckinterval(1) |
| try: |
| fs = set(self.executor.submit(future_func) for i in range(100)) |
| event.set() |
| futures.wait(fs, return_when=futures.ALL_COMPLETED) |
| finally: |
| sys.setcheckinterval(oldswitchinterval) |
| |
| |
| class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests): |
| pass |
| |
| |
| class AsCompletedTests(unittest.TestCase): |
| # TODO([email protected]): Should have a test with a non-zero timeout. |
| def test_no_timeout(self): |
| future1 = self.executor.submit(mul, 2, 21) |
| future2 = self.executor.submit(mul, 7, 6) |
| |
| completed = set(futures.as_completed( |
| [CANCELLED_AND_NOTIFIED_FUTURE, |
| EXCEPTION_FUTURE, |
| SUCCESSFUL_FUTURE, |
| future1, future2])) |
| self.assertEqual(set( |
| [CANCELLED_AND_NOTIFIED_FUTURE, |
| EXCEPTION_FUTURE, |
| SUCCESSFUL_FUTURE, |
| future1, future2]), |
| completed) |
| |
| def test_zero_timeout(self): |
| future1 = self.executor.submit(time.sleep, 2) |
| completed_futures = set() |
| try: |
| for future in futures.as_completed( |
| [CANCELLED_AND_NOTIFIED_FUTURE, |
| EXCEPTION_FUTURE, |
| SUCCESSFUL_FUTURE, |
| future1], |
| timeout=0): |
| completed_futures.add(future) |
| except futures.TimeoutError: |
| pass |
| |
| self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, |
| EXCEPTION_FUTURE, |
| SUCCESSFUL_FUTURE]), |
| completed_futures) |
| |
| def test_duplicate_futures(self): |
| # Issue 20367. Duplicate futures should not raise exceptions or give |
| # duplicate responses. |
| future1 = self.executor.submit(time.sleep, 2) |
| completed = [f for f in futures.as_completed([future1,future1])] |
| self.assertEqual(len(completed), 1) |
| |
| |
| class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests): |
| pass |
| |
| |
| class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests): |
| pass |
| |
| |
| class ExecutorTest(unittest.TestCase): |
| # Executor.shutdown() and context manager usage is tested by |
| # ExecutorShutdownTest. |
| def test_submit(self): |
| future = self.executor.submit(pow, 2, 8) |
| self.assertEqual(256, future.result()) |
| |
| def test_submit_keyword(self): |
| future = self.executor.submit(mul, 2, y=8) |
| self.assertEqual(16, future.result()) |
| |
| def test_map(self): |
| self.assertEqual( |
| list(self.executor.map(pow, range(10), range(10))), |
| list(map(pow, range(10), range(10)))) |
| |
| def test_map_exception(self): |
| i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) |
| self.assertEqual(next(i), (0, 1)) |
| self.assertEqual(next(i), (0, 1)) |
| self.assertRaises(ZeroDivisionError, next, i) |
| |
| def test_map_timeout(self): |
| results = [] |
| try: |
| for i in self.executor.map(time.sleep, |
| [0, 0, 3], |
| timeout=1.5): |
| results.append(i) |
| except futures.TimeoutError: |
| pass |
| else: |
| self.fail('expected TimeoutError') |
| |
| self.assertEqual([None, None], results) |
| |
| def test_max_workers_negative(self): |
| for number in (0, -1): |
| with self.assertRaises(ValueError) as cm: |
| self.executor_type(max_workers=number) |
| |
| assert str(cm.exception) == "max_workers must be greater than 0" |
| |
| |
| class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest): |
| def test_map_submits_without_iteration(self): |
| """Tests verifying issue 11777.""" |
| finished = [] |
| def record_finished(n): |
| finished.append(n) |
| |
| self.executor.map(record_finished, range(10)) |
| self.executor.shutdown(wait=True) |
| self.assertEqual(len(finished), 10) |
| |
| def test_default_workers(self): |
| executor = self.executor_type() |
| self.assertEqual(executor._max_workers, |
| (cpu_count() or 1) * 5) |
| |
| |
| class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest): |
| pass |
| |
| |
| class FutureTests(unittest.TestCase): |
| def test_done_callback_with_result(self): |
| callback_result = [None] |
| def fn(callback_future): |
| callback_result[0] = callback_future.result() |
| |
| f = Future() |
| f.add_done_callback(fn) |
| f.set_result(5) |
| self.assertEqual(5, callback_result[0]) |
| |
| def test_done_callback_with_exception(self): |
| callback_exception = [None] |
| def fn(callback_future): |
| callback_exception[0] = callback_future.exception() |
| |
| f = Future() |
| f.add_done_callback(fn) |
| f.set_exception(Exception('test')) |
| self.assertEqual(('test',), callback_exception[0].args) |
| |
| def test_done_callback_with_cancel(self): |
| was_cancelled = [None] |
| def fn(callback_future): |
| was_cancelled[0] = callback_future.cancelled() |
| |
| f = Future() |
| f.add_done_callback(fn) |
| self.assertTrue(f.cancel()) |
| self.assertTrue(was_cancelled[0]) |
| |
| def test_done_callback_raises(self): |
| with captured_stderr() as stderr: |
| raising_was_called = [False] |
| raising_old_style_was_called = [False] |
| fn_was_called = [False] |
| |
| def raising_fn(callback_future): |
| raising_was_called[0] = True |
| raise Exception('doh!') |
| |
| def raising_old_style_fn(callback_future): |
| raising_old_style_was_called[0] = True |
| class OldStyle: # Does not inherit from object |
| def __str__(self): |
| return 'doh!' |
| raise OldStyle() |
| |
| def fn(callback_future): |
| fn_was_called[0] = True |
| |
| f = Future() |
| f.add_done_callback(raising_fn) |
| f.add_done_callback(raising_old_style_fn) |
| f.add_done_callback(fn) |
| f.set_result(5) |
| self.assertTrue(raising_was_called) |
| self.assertTrue(raising_old_style_was_called) |
| self.assertTrue(fn_was_called) |
| self.assertIn('Exception: doh!', stderr.getvalue()) |
| self.assertIn('OldStyle: doh!', stderr.getvalue()) |
| |
| def test_done_callback_already_successful(self): |
| callback_result = [None] |
| def fn(callback_future): |
| callback_result[0] = callback_future.result() |
| |
| f = Future() |
| f.set_result(5) |
| f.add_done_callback(fn) |
| self.assertEqual(5, callback_result[0]) |
| |
| def test_done_callback_already_failed(self): |
| callback_exception = [None] |
| def fn(callback_future): |
| callback_exception[0] = callback_future.exception() |
| |
| f = Future() |
| f.set_exception(Exception('test')) |
| f.add_done_callback(fn) |
| self.assertEqual(('test',), callback_exception[0].args) |
| |
| def test_done_callback_already_cancelled(self): |
| was_cancelled = [None] |
| def fn(callback_future): |
| was_cancelled[0] = callback_future.cancelled() |
| |
| f = Future() |
| self.assertTrue(f.cancel()) |
| f.add_done_callback(fn) |
| self.assertTrue(was_cancelled[0]) |
| |
| def test_repr(self): |
| self.assertRegexpMatches(repr(PENDING_FUTURE), |
| '<Future at 0x[0-9a-f]+L? state=pending>') |
| self.assertRegexpMatches(repr(RUNNING_FUTURE), |
| '<Future at 0x[0-9a-f]+L? state=running>') |
| self.assertRegexpMatches(repr(CANCELLED_FUTURE), |
| '<Future at 0x[0-9a-f]+L? state=cancelled>') |
| self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE), |
| '<Future at 0x[0-9a-f]+L? state=cancelled>') |
| self.assertRegexpMatches( |
| repr(EXCEPTION_FUTURE), |
| '<Future at 0x[0-9a-f]+L? state=finished raised IOError>') |
| self.assertRegexpMatches( |
| repr(SUCCESSFUL_FUTURE), |
| '<Future at 0x[0-9a-f]+L? state=finished returned int>') |
| |
| def test_cancel(self): |
| f1 = create_future(state=PENDING) |
| f2 = create_future(state=RUNNING) |
| f3 = create_future(state=CANCELLED) |
| f4 = create_future(state=CANCELLED_AND_NOTIFIED) |
| f5 = create_future(state=FINISHED, exception=IOError()) |
| f6 = create_future(state=FINISHED, result=5) |
| |
| self.assertTrue(f1.cancel()) |
| self.assertEqual(f1._state, CANCELLED) |
| |
| self.assertFalse(f2.cancel()) |
| self.assertEqual(f2._state, RUNNING) |
| |
| self.assertTrue(f3.cancel()) |
| self.assertEqual(f3._state, CANCELLED) |
| |
| self.assertTrue(f4.cancel()) |
| self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) |
| |
| self.assertFalse(f5.cancel()) |
| self.assertEqual(f5._state, FINISHED) |
| |
| self.assertFalse(f6.cancel()) |
| self.assertEqual(f6._state, FINISHED) |
| |
| def test_cancelled(self): |
| self.assertFalse(PENDING_FUTURE.cancelled()) |
| self.assertFalse(RUNNING_FUTURE.cancelled()) |
| self.assertTrue(CANCELLED_FUTURE.cancelled()) |
| self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) |
| self.assertFalse(EXCEPTION_FUTURE.cancelled()) |
| self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) |
| |
| def test_done(self): |
| self.assertFalse(PENDING_FUTURE.done()) |
| self.assertFalse(RUNNING_FUTURE.done()) |
| self.assertTrue(CANCELLED_FUTURE.done()) |
| self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) |
| self.assertTrue(EXCEPTION_FUTURE.done()) |
| self.assertTrue(SUCCESSFUL_FUTURE.done()) |
| |
| def test_running(self): |
| self.assertFalse(PENDING_FUTURE.running()) |
| self.assertTrue(RUNNING_FUTURE.running()) |
| self.assertFalse(CANCELLED_FUTURE.running()) |
| self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) |
| self.assertFalse(EXCEPTION_FUTURE.running()) |
| self.assertFalse(SUCCESSFUL_FUTURE.running()) |
| |
| def test_result_with_timeout(self): |
| self.assertRaises(futures.TimeoutError, |
| PENDING_FUTURE.result, timeout=0) |
| self.assertRaises(futures.TimeoutError, |
| RUNNING_FUTURE.result, timeout=0) |
| self.assertRaises(futures.CancelledError, |
| CANCELLED_FUTURE.result, timeout=0) |
| self.assertRaises(futures.CancelledError, |
| CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) |
| self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0) |
| self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) |
| |
| def test_result_with_success(self): |
| # TODO([email protected]): This test is timing dependant. |
| def notification(): |
| # Wait until the main thread is waiting for the result. |
| time.sleep(1) |
| f1.set_result(42) |
| |
| f1 = create_future(state=PENDING) |
| t = threading.Thread(target=notification) |
| t.start() |
| |
| self.assertEqual(f1.result(timeout=5), 42) |
| |
| def test_result_with_cancel(self): |
| # TODO([email protected]): This test is timing dependant. |
| def notification(): |
| # Wait until the main thread is waiting for the result. |
| time.sleep(1) |
| f1.cancel() |
| |
| f1 = create_future(state=PENDING) |
| t = threading.Thread(target=notification) |
| t.start() |
| |
| self.assertRaises(futures.CancelledError, f1.result, timeout=5) |
| |
| def test_exception_with_timeout(self): |
| self.assertRaises(futures.TimeoutError, |
| PENDING_FUTURE.exception, timeout=0) |
| self.assertRaises(futures.TimeoutError, |
| RUNNING_FUTURE.exception, timeout=0) |
| self.assertRaises(futures.CancelledError, |
| CANCELLED_FUTURE.exception, timeout=0) |
| self.assertRaises(futures.CancelledError, |
| CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) |
| self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), |
| IOError)) |
| self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) |
| |
| def test_exception_with_success(self): |
| def notification(): |
| # Wait until the main thread is waiting for the exception. |
| time.sleep(1) |
| with f1._condition: |
| f1._state = FINISHED |
| f1._exception = IOError() |
| f1._condition.notify_all() |
| |
| f1 = create_future(state=PENDING) |
| t = threading.Thread(target=notification) |
| t.start() |
| |
| self.assertTrue(isinstance(f1.exception(timeout=5), IOError)) |
| |
| def test_old_style_exception(self): |
| class OldStyle: # Does not inherit from object |
| def __str__(self): |
| return 'doh!' |
| callback_exc_info = [None] |
| def fn(callback_future): |
| callback_exc_info[0] = callback_future.exception_info() |
| f = Future() |
| f.add_done_callback(fn) |
| try: |
| raise OldStyle() |
| except OldStyle: |
| want_exc_info = sys.exc_info() |
| f.set_exception_info(*want_exc_info[1:]) |
| self.assertEqual(f.exception_info(), want_exc_info[1:]) |
| self.assertEqual(callback_exc_info[0], want_exc_info[1:]) |
| try: |
| f.result() |
| except OldStyle: |
| got_exc_info = sys.exc_info() |
| else: |
| self.fail('OldStyle exception not raised') |
| self.assertEqual(got_exc_info[:2], want_exc_info[:2]) |
| got_tb = traceback.extract_tb(got_exc_info[2]) |
| want_tb = traceback.extract_tb(want_exc_info[2]) |
| self.assertEqual(got_tb[-len(want_tb):], want_tb) |
| |
| @reap_threads |
| def test_main(): |
| try: |
| test_support.run_unittest(ProcessPoolExecutorTest, |
| ThreadPoolExecutorTest, |
| ProcessPoolWaitTests, |
| ThreadPoolWaitTests, |
| ProcessPoolAsCompletedTests, |
| ThreadPoolAsCompletedTests, |
| FutureTests, |
| ProcessPoolShutdownTest, |
| ThreadPoolShutdownTest) |
| finally: |
| test_support.reap_children() |
| |
| if __name__ == "__main__": |
| test_main() |