| import inspect |
| import types |
| import unittest |
| import contextlib |
| |
| from test.support.import_helper import import_module |
| from test.support import gc_collect, requires_working_socket |
| asyncio = import_module("asyncio") |
| |
| |
| requires_working_socket(module=True) |
| |
| _no_default = object() |
| |
| |
| class AwaitException(Exception): |
| pass |
| |
| |
| @types.coroutine |
| def awaitable(*, throw=False): |
| if throw: |
| yield ('throw',) |
| else: |
| yield ('result',) |
| |
| |
| def run_until_complete(coro): |
| exc = False |
| while True: |
| try: |
| if exc: |
| exc = False |
| fut = coro.throw(AwaitException) |
| else: |
| fut = coro.send(None) |
| except StopIteration as ex: |
| return ex.args[0] |
| |
| if fut == ('throw',): |
| exc = True |
| |
| |
| def to_list(gen): |
| async def iterate(): |
| res = [] |
| async for i in gen: |
| res.append(i) |
| return res |
| |
| return run_until_complete(iterate()) |
| |
| |
| def py_anext(iterator, default=_no_default): |
| """Pure-Python implementation of anext() for testing purposes. |
| |
| Closely matches the builtin anext() C implementation. |
| Can be used to compare the built-in implementation of the inner |
| coroutines machinery to C-implementation of __anext__() and send() |
| or throw() on the returned generator. |
| """ |
| |
| try: |
| __anext__ = type(iterator).__anext__ |
| except AttributeError: |
| raise TypeError(f'{iterator!r} is not an async iterator') |
| |
| if default is _no_default: |
| return __anext__(iterator) |
| |
| async def anext_impl(): |
| try: |
| # The C code is way more low-level than this, as it implements |
| # all methods of the iterator protocol. In this implementation |
| # we're relying on higher-level coroutine concepts, but that's |
| # exactly what we want -- crosstest pure-Python high-level |
| # implementation and low-level C anext() iterators. |
| return await __anext__(iterator) |
| except StopAsyncIteration: |
| return default |
| |
| return anext_impl() |
| |
| |
| class AsyncGenSyntaxTest(unittest.TestCase): |
| |
| def test_async_gen_syntax_01(self): |
| code = '''async def foo(): |
| await abc |
| yield from 123 |
| ''' |
| |
| with self.assertRaisesRegex(SyntaxError, 'yield from.*inside async'): |
| exec(code, {}, {}) |
| |
| def test_async_gen_syntax_02(self): |
| code = '''async def foo(): |
| yield from 123 |
| ''' |
| |
| with self.assertRaisesRegex(SyntaxError, 'yield from.*inside async'): |
| exec(code, {}, {}) |
| |
| def test_async_gen_syntax_03(self): |
| code = '''async def foo(): |
| await abc |
| yield |
| return 123 |
| ''' |
| |
| with self.assertRaisesRegex(SyntaxError, 'return.*value.*async gen'): |
| exec(code, {}, {}) |
| |
| def test_async_gen_syntax_04(self): |
| code = '''async def foo(): |
| yield |
| return 123 |
| ''' |
| |
| with self.assertRaisesRegex(SyntaxError, 'return.*value.*async gen'): |
| exec(code, {}, {}) |
| |
| def test_async_gen_syntax_05(self): |
| code = '''async def foo(): |
| if 0: |
| yield |
| return 12 |
| ''' |
| |
| with self.assertRaisesRegex(SyntaxError, 'return.*value.*async gen'): |
| exec(code, {}, {}) |
| |
| |
| class AsyncGenTest(unittest.TestCase): |
| |
| def compare_generators(self, sync_gen, async_gen): |
| def sync_iterate(g): |
| res = [] |
| while True: |
| try: |
| res.append(g.__next__()) |
| except StopIteration: |
| res.append('STOP') |
| break |
| except Exception as ex: |
| res.append(str(type(ex))) |
| return res |
| |
| def async_iterate(g): |
| res = [] |
| while True: |
| an = g.__anext__() |
| try: |
| while True: |
| try: |
| an.__next__() |
| except StopIteration as ex: |
| if ex.args: |
| res.append(ex.args[0]) |
| break |
| else: |
| res.append('EMPTY StopIteration') |
| break |
| except StopAsyncIteration: |
| raise |
| except Exception as ex: |
| res.append(str(type(ex))) |
| break |
| except StopAsyncIteration: |
| res.append('STOP') |
| break |
| return res |
| |
| sync_gen_result = sync_iterate(sync_gen) |
| async_gen_result = async_iterate(async_gen) |
| self.assertEqual(sync_gen_result, async_gen_result) |
| return async_gen_result |
| |
| def test_async_gen_iteration_01(self): |
| async def gen(): |
| await awaitable() |
| a = yield 123 |
| self.assertIs(a, None) |
| await awaitable() |
| yield 456 |
| await awaitable() |
| yield 789 |
| |
| self.assertEqual(to_list(gen()), [123, 456, 789]) |
| |
| def test_async_gen_iteration_02(self): |
| async def gen(): |
| await awaitable() |
| yield 123 |
| await awaitable() |
| |
| g = gen() |
| ai = g.__aiter__() |
| |
| an = ai.__anext__() |
| self.assertEqual(an.__next__(), ('result',)) |
| |
| try: |
| an.__next__() |
| except StopIteration as ex: |
| self.assertEqual(ex.args[0], 123) |
| else: |
| self.fail('StopIteration was not raised') |
| |
| an = ai.__anext__() |
| self.assertEqual(an.__next__(), ('result',)) |
| |
| try: |
| an.__next__() |
| except StopAsyncIteration as ex: |
| self.assertFalse(ex.args) |
| else: |
| self.fail('StopAsyncIteration was not raised') |
| |
| def test_async_gen_exception_03(self): |
| async def gen(): |
| await awaitable() |
| yield 123 |
| await awaitable(throw=True) |
| yield 456 |
| |
| with self.assertRaises(AwaitException): |
| to_list(gen()) |
| |
| def test_async_gen_exception_04(self): |
| async def gen(): |
| await awaitable() |
| yield 123 |
| 1 / 0 |
| |
| g = gen() |
| ai = g.__aiter__() |
| an = ai.__anext__() |
| self.assertEqual(an.__next__(), ('result',)) |
| |
| try: |
| an.__next__() |
| except StopIteration as ex: |
| self.assertEqual(ex.args[0], 123) |
| else: |
| self.fail('StopIteration was not raised') |
| |
| with self.assertRaises(ZeroDivisionError): |
| ai.__anext__().__next__() |
| |
| def test_async_gen_exception_05(self): |
| async def gen(): |
| yield 123 |
| raise StopAsyncIteration |
| |
| with self.assertRaisesRegex(RuntimeError, |
| 'async generator.*StopAsyncIteration'): |
| to_list(gen()) |
| |
| def test_async_gen_exception_06(self): |
| async def gen(): |
| yield 123 |
| raise StopIteration |
| |
| with self.assertRaisesRegex(RuntimeError, |
| 'async generator.*StopIteration'): |
| to_list(gen()) |
| |
| def test_async_gen_exception_07(self): |
| def sync_gen(): |
| try: |
| yield 1 |
| 1 / 0 |
| finally: |
| yield 2 |
| yield 3 |
| |
| yield 100 |
| |
| async def async_gen(): |
| try: |
| yield 1 |
| 1 / 0 |
| finally: |
| yield 2 |
| yield 3 |
| |
| yield 100 |
| |
| self.compare_generators(sync_gen(), async_gen()) |
| |
| def test_async_gen_exception_08(self): |
| def sync_gen(): |
| try: |
| yield 1 |
| finally: |
| yield 2 |
| 1 / 0 |
| yield 3 |
| |
| yield 100 |
| |
| async def async_gen(): |
| try: |
| yield 1 |
| await awaitable() |
| finally: |
| await awaitable() |
| yield 2 |
| 1 / 0 |
| yield 3 |
| |
| yield 100 |
| |
| self.compare_generators(sync_gen(), async_gen()) |
| |
| def test_async_gen_exception_09(self): |
| def sync_gen(): |
| try: |
| yield 1 |
| 1 / 0 |
| finally: |
| yield 2 |
| yield 3 |
| |
| yield 100 |
| |
| async def async_gen(): |
| try: |
| await awaitable() |
| yield 1 |
| 1 / 0 |
| finally: |
| yield 2 |
| await awaitable() |
| yield 3 |
| |
| yield 100 |
| |
| self.compare_generators(sync_gen(), async_gen()) |
| |
| def test_async_gen_exception_10(self): |
| async def gen(): |
| yield 123 |
| with self.assertRaisesRegex(TypeError, |
| "non-None value .* async generator"): |
| gen().__anext__().send(100) |
| |
| def test_async_gen_exception_11(self): |
| def sync_gen(): |
| yield 10 |
| yield 20 |
| |
| def sync_gen_wrapper(): |
| yield 1 |
| sg = sync_gen() |
| sg.send(None) |
| try: |
| sg.throw(GeneratorExit()) |
| except GeneratorExit: |
| yield 2 |
| yield 3 |
| |
| async def async_gen(): |
| yield 10 |
| yield 20 |
| |
| async def async_gen_wrapper(): |
| yield 1 |
| asg = async_gen() |
| await asg.asend(None) |
| try: |
| await asg.athrow(GeneratorExit()) |
| except GeneratorExit: |
| yield 2 |
| yield 3 |
| |
| self.compare_generators(sync_gen_wrapper(), async_gen_wrapper()) |
| |
| def test_async_gen_api_01(self): |
| async def gen(): |
| yield 123 |
| |
| g = gen() |
| |
| self.assertEqual(g.__name__, 'gen') |
| g.__name__ = '123' |
| self.assertEqual(g.__name__, '123') |
| |
| self.assertIn('.gen', g.__qualname__) |
| g.__qualname__ = '123' |
| self.assertEqual(g.__qualname__, '123') |
| |
| self.assertIsNone(g.ag_await) |
| self.assertIsInstance(g.ag_frame, types.FrameType) |
| self.assertFalse(g.ag_running) |
| self.assertIsInstance(g.ag_code, types.CodeType) |
| |
| self.assertTrue(inspect.isawaitable(g.aclose())) |
| |
| |
| class AsyncGenAsyncioTest(unittest.TestCase): |
| |
| def setUp(self): |
| self.loop = asyncio.new_event_loop() |
| asyncio.set_event_loop(None) |
| |
| def tearDown(self): |
| self.loop.close() |
| self.loop = None |
| asyncio.set_event_loop_policy(None) |
| |
| def check_async_iterator_anext(self, ait_class): |
| with self.subTest(anext="pure-Python"): |
| self._check_async_iterator_anext(ait_class, py_anext) |
| with self.subTest(anext="builtin"): |
| self._check_async_iterator_anext(ait_class, anext) |
| |
| def _check_async_iterator_anext(self, ait_class, anext): |
| g = ait_class() |
| async def consume(): |
| results = [] |
| results.append(await anext(g)) |
| results.append(await anext(g)) |
| results.append(await anext(g, 'buckle my shoe')) |
| return results |
| res = self.loop.run_until_complete(consume()) |
| self.assertEqual(res, [1, 2, 'buckle my shoe']) |
| with self.assertRaises(StopAsyncIteration): |
| self.loop.run_until_complete(consume()) |
| |
| async def test_2(): |
| g1 = ait_class() |
| self.assertEqual(await anext(g1), 1) |
| self.assertEqual(await anext(g1), 2) |
| with self.assertRaises(StopAsyncIteration): |
| await anext(g1) |
| with self.assertRaises(StopAsyncIteration): |
| await anext(g1) |
| |
| g2 = ait_class() |
| self.assertEqual(await anext(g2, "default"), 1) |
| self.assertEqual(await anext(g2, "default"), 2) |
| self.assertEqual(await anext(g2, "default"), "default") |
| self.assertEqual(await anext(g2, "default"), "default") |
| |
| return "completed" |
| |
| result = self.loop.run_until_complete(test_2()) |
| self.assertEqual(result, "completed") |
| |
| def test_send(): |
| p = ait_class() |
| obj = anext(p, "completed") |
| with self.assertRaises(StopIteration): |
| with contextlib.closing(obj.__await__()) as g: |
| g.send(None) |
| |
| test_send() |
| |
| async def test_throw(): |
| p = ait_class() |
| obj = anext(p, "completed") |
| self.assertRaises(SyntaxError, obj.throw, SyntaxError) |
| return "completed" |
| |
| result = self.loop.run_until_complete(test_throw()) |
| self.assertEqual(result, "completed") |
| |
| def test_async_generator_anext(self): |
| async def agen(): |
| yield 1 |
| yield 2 |
| self.check_async_iterator_anext(agen) |
| |
| def test_python_async_iterator_anext(self): |
| class MyAsyncIter: |
| """Asynchronously yield 1, then 2.""" |
| def __init__(self): |
| self.yielded = 0 |
| def __aiter__(self): |
| return self |
| async def __anext__(self): |
| if self.yielded >= 2: |
| raise StopAsyncIteration() |
| else: |
| self.yielded += 1 |
| return self.yielded |
| self.check_async_iterator_anext(MyAsyncIter) |
| |
| def test_python_async_iterator_types_coroutine_anext(self): |
| import types |
| class MyAsyncIterWithTypesCoro: |
| """Asynchronously yield 1, then 2.""" |
| def __init__(self): |
| self.yielded = 0 |
| def __aiter__(self): |
| return self |
| @types.coroutine |
| def __anext__(self): |
| if False: |
| yield "this is a generator-based coroutine" |
| if self.yielded >= 2: |
| raise StopAsyncIteration() |
| else: |
| self.yielded += 1 |
| return self.yielded |
| self.check_async_iterator_anext(MyAsyncIterWithTypesCoro) |
| |
| def test_async_gen_aiter(self): |
| async def gen(): |
| yield 1 |
| yield 2 |
| g = gen() |
| async def consume(): |
| return [i async for i in aiter(g)] |
| res = self.loop.run_until_complete(consume()) |
| self.assertEqual(res, [1, 2]) |
| |
| def test_async_gen_aiter_class(self): |
| results = [] |
| class Gen: |
| async def __aiter__(self): |
| yield 1 |
| yield 2 |
| g = Gen() |
| async def consume(): |
| ait = aiter(g) |
| while True: |
| try: |
| results.append(await anext(ait)) |
| except StopAsyncIteration: |
| break |
| self.loop.run_until_complete(consume()) |
| self.assertEqual(results, [1, 2]) |
| |
| def test_aiter_idempotent(self): |
| async def gen(): |
| yield 1 |
| applied_once = aiter(gen()) |
| applied_twice = aiter(applied_once) |
| self.assertIs(applied_once, applied_twice) |
| |
| def test_anext_bad_args(self): |
| async def gen(): |
| yield 1 |
| async def call_with_too_few_args(): |
| await anext() |
| async def call_with_too_many_args(): |
| await anext(gen(), 1, 3) |
| async def call_with_wrong_type_args(): |
| await anext(1, gen()) |
| async def call_with_kwarg(): |
| await anext(aiterator=gen()) |
| with self.assertRaises(TypeError): |
| self.loop.run_until_complete(call_with_too_few_args()) |
| with self.assertRaises(TypeError): |
| self.loop.run_until_complete(call_with_too_many_args()) |
| with self.assertRaises(TypeError): |
| self.loop.run_until_complete(call_with_wrong_type_args()) |
| with self.assertRaises(TypeError): |
| self.loop.run_until_complete(call_with_kwarg()) |
| |
| def test_anext_bad_await(self): |
| async def bad_awaitable(): |
| class BadAwaitable: |
| def __await__(self): |
| return 42 |
| class MyAsyncIter: |
| def __aiter__(self): |
| return self |
| def __anext__(self): |
| return BadAwaitable() |
| regex = r"__await__.*iterator" |
| awaitable = anext(MyAsyncIter(), "default") |
| with self.assertRaisesRegex(TypeError, regex): |
| await awaitable |
| awaitable = anext(MyAsyncIter()) |
| with self.assertRaisesRegex(TypeError, regex): |
| await awaitable |
| return "completed" |
| result = self.loop.run_until_complete(bad_awaitable()) |
| self.assertEqual(result, "completed") |
| |
| async def check_anext_returning_iterator(self, aiter_class): |
| awaitable = anext(aiter_class(), "default") |
| with self.assertRaises(TypeError): |
| await awaitable |
| awaitable = anext(aiter_class()) |
| with self.assertRaises(TypeError): |
| await awaitable |
| return "completed" |
| |
| def test_anext_return_iterator(self): |
| class WithIterAnext: |
| def __aiter__(self): |
| return self |
| def __anext__(self): |
| return iter("abc") |
| result = self.loop.run_until_complete(self.check_anext_returning_iterator(WithIterAnext)) |
| self.assertEqual(result, "completed") |
| |
| def test_anext_return_generator(self): |
| class WithGenAnext: |
| def __aiter__(self): |
| return self |
| def __anext__(self): |
| yield |
| result = self.loop.run_until_complete(self.check_anext_returning_iterator(WithGenAnext)) |
| self.assertEqual(result, "completed") |
| |
| def test_anext_await_raises(self): |
| class RaisingAwaitable: |
| def __await__(self): |
| raise ZeroDivisionError() |
| yield |
| class WithRaisingAwaitableAnext: |
| def __aiter__(self): |
| return self |
| def __anext__(self): |
| return RaisingAwaitable() |
| async def do_test(): |
| awaitable = anext(WithRaisingAwaitableAnext()) |
| with self.assertRaises(ZeroDivisionError): |
| await awaitable |
| awaitable = anext(WithRaisingAwaitableAnext(), "default") |
| with self.assertRaises(ZeroDivisionError): |
| await awaitable |
| return "completed" |
| result = self.loop.run_until_complete(do_test()) |
| self.assertEqual(result, "completed") |
| |
| def test_anext_iter(self): |
| @types.coroutine |
| def _async_yield(v): |
| return (yield v) |
| |
| class MyError(Exception): |
| pass |
| |
| async def agenfn(): |
| try: |
| await _async_yield(1) |
| except MyError: |
| await _async_yield(2) |
| return |
| yield |
| |
| def test1(anext): |
| agen = agenfn() |
| with contextlib.closing(anext(agen, "default").__await__()) as g: |
| self.assertEqual(g.send(None), 1) |
| self.assertEqual(g.throw(MyError, MyError(), None), 2) |
| try: |
| g.send(None) |
| except StopIteration as e: |
| err = e |
| else: |
| self.fail('StopIteration was not raised') |
| self.assertEqual(err.value, "default") |
| |
| def test2(anext): |
| agen = agenfn() |
| with contextlib.closing(anext(agen, "default").__await__()) as g: |
| self.assertEqual(g.send(None), 1) |
| self.assertEqual(g.throw(MyError, MyError(), None), 2) |
| with self.assertRaises(MyError): |
| g.throw(MyError, MyError(), None) |
| |
| def test3(anext): |
| agen = agenfn() |
| with contextlib.closing(anext(agen, "default").__await__()) as g: |
| self.assertEqual(g.send(None), 1) |
| g.close() |
| with self.assertRaisesRegex(RuntimeError, 'cannot reuse'): |
| self.assertEqual(g.send(None), 1) |
| |
| def test4(anext): |
| @types.coroutine |
| def _async_yield(v): |
| yield v * 10 |
| return (yield (v * 10 + 1)) |
| |
| async def agenfn(): |
| try: |
| await _async_yield(1) |
| except MyError: |
| await _async_yield(2) |
| return |
| yield |
| |
| agen = agenfn() |
| with contextlib.closing(anext(agen, "default").__await__()) as g: |
| self.assertEqual(g.send(None), 10) |
| self.assertEqual(g.throw(MyError, MyError(), None), 20) |
| with self.assertRaisesRegex(MyError, 'val'): |
| g.throw(MyError, MyError('val'), None) |
| |
| def test5(anext): |
| @types.coroutine |
| def _async_yield(v): |
| yield v * 10 |
| return (yield (v * 10 + 1)) |
| |
| async def agenfn(): |
| try: |
| await _async_yield(1) |
| except MyError: |
| return |
| yield 'aaa' |
| |
| agen = agenfn() |
| with contextlib.closing(anext(agen, "default").__await__()) as g: |
| self.assertEqual(g.send(None), 10) |
| with self.assertRaisesRegex(StopIteration, 'default'): |
| g.throw(MyError, MyError(), None) |
| |
| def test6(anext): |
| @types.coroutine |
| def _async_yield(v): |
| yield v * 10 |
| return (yield (v * 10 + 1)) |
| |
| async def agenfn(): |
| await _async_yield(1) |
| yield 'aaa' |
| |
| agen = agenfn() |
| with contextlib.closing(anext(agen, "default").__await__()) as g: |
| with self.assertRaises(MyError): |
| g.throw(MyError, MyError(), None) |
| |
| def run_test(test): |
| with self.subTest('pure-Python anext()'): |
| test(py_anext) |
| with self.subTest('builtin anext()'): |
| test(anext) |
| |
| run_test(test1) |
| run_test(test2) |
| run_test(test3) |
| run_test(test4) |
| run_test(test5) |
| run_test(test6) |
| |
| def test_aiter_bad_args(self): |
| async def gen(): |
| yield 1 |
| async def call_with_too_few_args(): |
| await aiter() |
| async def call_with_too_many_args(): |
| await aiter(gen(), 1) |
| async def call_with_wrong_type_arg(): |
| await aiter(1) |
| with self.assertRaises(TypeError): |
| self.loop.run_until_complete(call_with_too_few_args()) |
| with self.assertRaises(TypeError): |
| self.loop.run_until_complete(call_with_too_many_args()) |
| with self.assertRaises(TypeError): |
| self.loop.run_until_complete(call_with_wrong_type_arg()) |
| |
| async def to_list(self, gen): |
| res = [] |
| async for i in gen: |
| res.append(i) |
| return res |
| |
| def test_async_gen_asyncio_01(self): |
| async def gen(): |
| yield 1 |
| await asyncio.sleep(0.01) |
| yield 2 |
| await asyncio.sleep(0.01) |
| return |
| yield 3 |
| |
| res = self.loop.run_until_complete(self.to_list(gen())) |
| self.assertEqual(res, [1, 2]) |
| |
| def test_async_gen_asyncio_02(self): |
| async def gen(): |
| yield 1 |
| await asyncio.sleep(0.01) |
| yield 2 |
| 1 / 0 |
| yield 3 |
| |
| with self.assertRaises(ZeroDivisionError): |
| self.loop.run_until_complete(self.to_list(gen())) |
| |
| def test_async_gen_asyncio_03(self): |
| loop = self.loop |
| |
| class Gen: |
| async def __aiter__(self): |
| yield 1 |
| await asyncio.sleep(0.01) |
| yield 2 |
| |
| res = loop.run_until_complete(self.to_list(Gen())) |
| self.assertEqual(res, [1, 2]) |
| |
| def test_async_gen_asyncio_anext_04(self): |
| async def foo(): |
| yield 1 |
| await asyncio.sleep(0.01) |
| try: |
| yield 2 |
| yield 3 |
| except ZeroDivisionError: |
| yield 1000 |
| await asyncio.sleep(0.01) |
| yield 4 |
| |
| async def run1(): |
| it = foo().__aiter__() |
| |
| self.assertEqual(await it.__anext__(), 1) |
| self.assertEqual(await it.__anext__(), 2) |
| self.assertEqual(await it.__anext__(), 3) |
| self.assertEqual(await it.__anext__(), 4) |
| with self.assertRaises(StopAsyncIteration): |
| await it.__anext__() |
| with self.assertRaises(StopAsyncIteration): |
| await it.__anext__() |
| |
| async def run2(): |
| it = foo().__aiter__() |
| |
| self.assertEqual(await it.__anext__(), 1) |
| self.assertEqual(await it.__anext__(), 2) |
| try: |
| it.__anext__().throw(ZeroDivisionError) |
| except StopIteration as ex: |
| self.assertEqual(ex.args[0], 1000) |
| else: |
| self.fail('StopIteration was not raised') |
| self.assertEqual(await it.__anext__(), 4) |
| with self.assertRaises(StopAsyncIteration): |
| await it.__anext__() |
| |
| self.loop.run_until_complete(run1()) |
| self.loop.run_until_complete(run2()) |
| |
| def test_async_gen_asyncio_anext_05(self): |
| async def foo(): |
| v = yield 1 |
| v = yield v |
| yield v * 100 |
| |
| async def run(): |
| it = foo().__aiter__() |
| |
| try: |
| it.__anext__().send(None) |
| except StopIteration as ex: |
| self.assertEqual(ex.args[0], 1) |
| else: |
| self.fail('StopIteration was not raised') |
| |
| try: |
| it.__anext__().send(10) |
| except StopIteration as ex: |
| self.assertEqual(ex.args[0], 10) |
| else: |
| self.fail('StopIteration was not raised') |
| |
| try: |
| it.__anext__().send(12) |
| except StopIteration as ex: |
| self.assertEqual(ex.args[0], 1200) |
| else: |
| self.fail('StopIteration was not raised') |
| |
| with self.assertRaises(StopAsyncIteration): |
| await it.__anext__() |
| |
| self.loop.run_until_complete(run()) |
| |
| def test_async_gen_asyncio_anext_06(self): |
| DONE = 0 |
| |
| # test synchronous generators |
| def foo(): |
| try: |
| yield |
| except: |
| pass |
| g = foo() |
| g.send(None) |
| with self.assertRaises(StopIteration): |
| g.send(None) |
| |
| # now with asynchronous generators |
| |
| async def gen(): |
| nonlocal DONE |
| try: |
| yield |
| except: |
| pass |
| DONE = 1 |
| |
| async def run(): |
| nonlocal DONE |
| g = gen() |
| await g.asend(None) |
| with self.assertRaises(StopAsyncIteration): |
| await g.asend(None) |
| DONE += 10 |
| |
| self.loop.run_until_complete(run()) |
| self.assertEqual(DONE, 11) |
| |
| def test_async_gen_asyncio_anext_tuple(self): |
| async def foo(): |
| try: |
| yield (1,) |
| except ZeroDivisionError: |
| yield (2,) |
| |
| async def run(): |
| it = foo().__aiter__() |
| |
| self.assertEqual(await it.__anext__(), (1,)) |
| with self.assertRaises(StopIteration) as cm: |
| it.__anext__().throw(ZeroDivisionError) |
| self.assertEqual(cm.exception.args[0], (2,)) |
| with self.assertRaises(StopAsyncIteration): |
| await it.__anext__() |
| |
| self.loop.run_until_complete(run()) |
| |
| def test_async_gen_asyncio_anext_stopiteration(self): |
| async def foo(): |
| try: |
| yield StopIteration(1) |
| except ZeroDivisionError: |
| yield StopIteration(3) |
| |
| async def run(): |
| it = foo().__aiter__() |
| |
| v = await it.__anext__() |
| self.assertIsInstance(v, StopIteration) |
| self.assertEqual(v.value, 1) |
| with self.assertRaises(StopIteration) as cm: |
| it.__anext__().throw(ZeroDivisionError) |
| v = cm.exception.args[0] |
| self.assertIsInstance(v, StopIteration) |
| self.assertEqual(v.value, 3) |
| with self.assertRaises(StopAsyncIteration): |
| await it.__anext__() |
| |
| self.loop.run_until_complete(run()) |
| |
| def test_async_gen_asyncio_aclose_06(self): |
| async def foo(): |
| try: |
| yield 1 |
| 1 / 0 |
| finally: |
| await asyncio.sleep(0.01) |
| yield 12 |
| |
| async def run(): |
| gen = foo() |
| it = gen.__aiter__() |
| await it.__anext__() |
| await gen.aclose() |
| |
| with self.assertRaisesRegex( |
| RuntimeError, |
| "async generator ignored GeneratorExit"): |
| self.loop.run_until_complete(run()) |
| |
| def test_async_gen_asyncio_aclose_07(self): |
| DONE = 0 |
| |
| async def foo(): |
| nonlocal DONE |
| try: |
| yield 1 |
| 1 / 0 |
| finally: |
| await asyncio.sleep(0.01) |
| await asyncio.sleep(0.01) |
| DONE += 1 |
| DONE += 1000 |
| |
| async def run(): |
| gen = foo() |
| it = gen.__aiter__() |
| await it.__anext__() |
| await gen.aclose() |
| |
| self.loop.run_until_complete(run()) |
| self.assertEqual(DONE, 1) |
| |
| def test_async_gen_asyncio_aclose_08(self): |
| DONE = 0 |
| |
| fut = asyncio.Future(loop=self.loop) |
| |
| async def foo(): |
| nonlocal DONE |
| try: |
| yield 1 |
| await fut |
| DONE += 1000 |
| yield 2 |
| finally: |
| await asyncio.sleep(0.01) |
| await asyncio.sleep(0.01) |
| DONE += 1 |
| DONE += 1000 |
| |
| async def run(): |
| gen = foo() |
| it = gen.__aiter__() |
| self.assertEqual(await it.__anext__(), 1) |
| await gen.aclose() |
| |
| self.loop.run_until_complete(run()) |
| self.assertEqual(DONE, 1) |
| |
| # Silence ResourceWarnings |
| fut.cancel() |
| self.loop.run_until_complete(asyncio.sleep(0.01)) |
| |
| def test_async_gen_asyncio_gc_aclose_09(self): |
| DONE = 0 |
| |
| async def gen(): |
| nonlocal DONE |
| try: |
| while True: |
| yield 1 |
| finally: |
| await asyncio.sleep(0.01) |
| await asyncio.sleep(0.01) |
| DONE = 1 |
| |
| async def run(): |
| g = gen() |
| await g.__anext__() |
| await g.__anext__() |
| del g |
| gc_collect() # For PyPy or other GCs. |
| |
| await asyncio.sleep(0.1) |
| |
| self.loop.run_until_complete(run()) |
| self.assertEqual(DONE, 1) |
| |
| def test_async_gen_asyncio_aclose_10(self): |
| DONE = 0 |
| |
| # test synchronous generators |
| def foo(): |
| try: |
| yield |
| except: |
| pass |
| g = foo() |
| g.send(None) |
| g.close() |
| |
| # now with asynchronous generators |
| |
| async def gen(): |
| nonlocal DONE |
| try: |
| yield |
| except: |
| pass |
| DONE = 1 |
| |
| async def run(): |
| nonlocal DONE |
| g = gen() |
| await g.asend(None) |
| await g.aclose() |
| DONE += 10 |
| |
| self.loop.run_until_complete(run()) |
| self.assertEqual(DONE, 11) |
| |
| def test_async_gen_asyncio_aclose_11(self): |
| DONE = 0 |
| |
| # test synchronous generators |
| def foo(): |
| try: |
| yield |
| except: |
| pass |
| yield |
| g = foo() |
| g.send(None) |
| with self.assertRaisesRegex(RuntimeError, 'ignored GeneratorExit'): |
| g.close() |
| |
| # now with asynchronous generators |
| |
| async def gen(): |
| nonlocal DONE |
| try: |
| yield |
| except: |
| pass |
| yield |
| DONE += 1 |
| |
| async def run(): |
| nonlocal DONE |
| g = gen() |
| await g.asend(None) |
| with self.assertRaisesRegex(RuntimeError, 'ignored GeneratorExit'): |
| await g.aclose() |
| DONE += 10 |
| |
| self.loop.run_until_complete(run()) |
| self.assertEqual(DONE, 10) |
| |
| def test_async_gen_asyncio_aclose_12(self): |
| DONE = 0 |
| |
| async def target(): |
| await asyncio.sleep(0.01) |
| 1 / 0 |
| |
| async def foo(): |
| nonlocal DONE |
| task = asyncio.create_task(target()) |
| try: |
| yield 1 |
| finally: |
| try: |
| await task |
| except ZeroDivisionError: |
| DONE = 1 |
| |
| async def run(): |
| gen = foo() |
| it = gen.__aiter__() |
| await it.__anext__() |
| await gen.aclose() |
| |
| self.loop.run_until_complete(run()) |
| self.assertEqual(DONE, 1) |
| |
| def test_async_gen_asyncio_asend_01(self): |
| DONE = 0 |
| |
| # Sanity check: |
| def sgen(): |
| v = yield 1 |
| yield v * 2 |
| sg = sgen() |
| v = sg.send(None) |
| self.assertEqual(v, 1) |
| v = sg.send(100) |
| self.assertEqual(v, 200) |
| |
| async def gen(): |
| nonlocal DONE |
| try: |
| await asyncio.sleep(0.01) |
| v = yield 1 |
| await asyncio.sleep(0.01) |
| yield v * 2 |
| await asyncio.sleep(0.01) |
| return |
| finally: |
| await asyncio.sleep(0.01) |
| await asyncio.sleep(0.01) |
| DONE = 1 |
| |
| async def run(): |
| g = gen() |
| |
| v = await g.asend(None) |
| self.assertEqual(v, 1) |
| |
| v = await g.asend(100) |
| self.assertEqual(v, 200) |
| |
| with self.assertRaises(StopAsyncIteration): |
| await g.asend(None) |
| |
| self.loop.run_until_complete(run()) |
| self.assertEqual(DONE, 1) |
| |
| def test_async_gen_asyncio_asend_02(self): |
| DONE = 0 |
| |
| async def sleep_n_crash(delay): |
| await asyncio.sleep(delay) |
| 1 / 0 |
| |
| async def gen(): |
| nonlocal DONE |
| try: |
| await asyncio.sleep(0.01) |
| v = yield 1 |
| await sleep_n_crash(0.01) |
| DONE += 1000 |
| yield v * 2 |
| finally: |
| await asyncio.sleep(0.01) |
| await asyncio.sleep(0.01) |
| DONE = 1 |
| |
| async def run(): |
| g = gen() |
| |
| v = await g.asend(None) |
| self.assertEqual(v, 1) |
| |
| await g.asend(100) |
| |
| with self.assertRaises(ZeroDivisionError): |
| self.loop.run_until_complete(run()) |
| self.assertEqual(DONE, 1) |
| |
| def test_async_gen_asyncio_asend_03(self): |
| DONE = 0 |
| |
| async def sleep_n_crash(delay): |
| fut = asyncio.ensure_future(asyncio.sleep(delay), |
| loop=self.loop) |
| self.loop.call_later(delay / 2, lambda: fut.cancel()) |
| return await fut |
| |
| async def gen(): |
| nonlocal DONE |
| try: |
| await asyncio.sleep(0.01) |
| v = yield 1 |
| await sleep_n_crash(0.01) |
| DONE += 1000 |
| yield v * 2 |
| finally: |
| await asyncio.sleep(0.01) |
| await asyncio.sleep(0.01) |
| DONE = 1 |
| |
| async def run(): |
| g = gen() |
| |
| v = await g.asend(None) |
| self.assertEqual(v, 1) |
| |
| await g.asend(100) |
| |
| with self.assertRaises(asyncio.CancelledError): |
| self.loop.run_until_complete(run()) |
| self.assertEqual(DONE, 1) |
| |
| def test_async_gen_asyncio_athrow_01(self): |
| DONE = 0 |
| |
| class FooEr(Exception): |
| pass |
| |
| # Sanity check: |
| def sgen(): |
| try: |
| v = yield 1 |
| except FooEr: |
| v = 1000 |
| yield v * 2 |
| sg = sgen() |
| v = sg.send(None) |
| self.assertEqual(v, 1) |
| v = sg.throw(FooEr) |
| self.assertEqual(v, 2000) |
| with self.assertRaises(StopIteration): |
| sg.send(None) |
| |
| async def gen(): |
| nonlocal DONE |
| try: |
| await asyncio.sleep(0.01) |
| try: |
| v = yield 1 |
| except FooEr: |
| v = 1000 |
| await asyncio.sleep(0.01) |
| yield v * 2 |
| await asyncio.sleep(0.01) |
| # return |
| finally: |
| await asyncio.sleep(0.01) |
| await asyncio.sleep(0.01) |
| DONE = 1 |
| |
| async def run(): |
| g = gen() |
| |
| v = await g.asend(None) |
| self.assertEqual(v, 1) |
| |
| v = await g.athrow(FooEr) |
| self.assertEqual(v, 2000) |
| |
| with self.assertRaises(StopAsyncIteration): |
| await g.asend(None) |
| |
| self.loop.run_until_complete(run()) |
| self.assertEqual(DONE, 1) |
| |
| def test_async_gen_asyncio_athrow_02(self): |
| DONE = 0 |
| |
| class FooEr(Exception): |
| pass |
| |
| async def sleep_n_crash(delay): |
| fut = asyncio.ensure_future(asyncio.sleep(delay), |
| loop=self.loop) |
| self.loop.call_later(delay / 2, lambda: fut.cancel()) |
| return await fut |
| |
| async def gen(): |
| nonlocal DONE |
| try: |
| await asyncio.sleep(0.01) |
| try: |
| v = yield 1 |
| except FooEr: |
| await sleep_n_crash(0.01) |
| yield v * 2 |
| await asyncio.sleep(0.01) |
| # return |
| finally: |
| await asyncio.sleep(0.01) |
| await asyncio.sleep(0.01) |
| DONE = 1 |
| |
| async def run(): |
| g = gen() |
| |
| v = await g.asend(None) |
| self.assertEqual(v, 1) |
| |
| try: |
| await g.athrow(FooEr) |
| except asyncio.CancelledError: |
| self.assertEqual(DONE, 1) |
| raise |
| else: |
| self.fail('CancelledError was not raised') |
| |
| with self.assertRaises(asyncio.CancelledError): |
| self.loop.run_until_complete(run()) |
| self.assertEqual(DONE, 1) |
| |
| def test_async_gen_asyncio_athrow_03(self): |
| DONE = 0 |
| |
| # test synchronous generators |
| def foo(): |
| try: |
| yield |
| except: |
| pass |
| g = foo() |
| g.send(None) |
| with self.assertRaises(StopIteration): |
| g.throw(ValueError) |
| |
| # now with asynchronous generators |
| |
| async def gen(): |
| nonlocal DONE |
| try: |
| yield |
| except: |
| pass |
| DONE = 1 |
| |
| async def run(): |
| nonlocal DONE |
| g = gen() |
| await g.asend(None) |
| with self.assertRaises(StopAsyncIteration): |
| await g.athrow(ValueError) |
| DONE += 10 |
| |
| self.loop.run_until_complete(run()) |
| self.assertEqual(DONE, 11) |
| |
| def test_async_gen_asyncio_athrow_tuple(self): |
| async def gen(): |
| try: |
| yield 1 |
| except ZeroDivisionError: |
| yield (2,) |
| |
| async def run(): |
| g = gen() |
| v = await g.asend(None) |
| self.assertEqual(v, 1) |
| v = await g.athrow(ZeroDivisionError) |
| self.assertEqual(v, (2,)) |
| with self.assertRaises(StopAsyncIteration): |
| await g.asend(None) |
| |
| self.loop.run_until_complete(run()) |
| |
| def test_async_gen_asyncio_athrow_stopiteration(self): |
| async def gen(): |
| try: |
| yield 1 |
| except ZeroDivisionError: |
| yield StopIteration(2) |
| |
| async def run(): |
| g = gen() |
| v = await g.asend(None) |
| self.assertEqual(v, 1) |
| v = await g.athrow(ZeroDivisionError) |
| self.assertIsInstance(v, StopIteration) |
| self.assertEqual(v.value, 2) |
| with self.assertRaises(StopAsyncIteration): |
| await g.asend(None) |
| |
| self.loop.run_until_complete(run()) |
| |
| def test_async_gen_asyncio_shutdown_01(self): |
| finalized = 0 |
| |
| async def waiter(timeout): |
| nonlocal finalized |
| try: |
| await asyncio.sleep(timeout) |
| yield 1 |
| finally: |
| await asyncio.sleep(0) |
| finalized += 1 |
| |
| async def wait(): |
| async for _ in waiter(1): |
| pass |
| |
| t1 = self.loop.create_task(wait()) |
| t2 = self.loop.create_task(wait()) |
| |
| self.loop.run_until_complete(asyncio.sleep(0.1)) |
| |
| # Silence warnings |
| t1.cancel() |
| t2.cancel() |
| |
| with self.assertRaises(asyncio.CancelledError): |
| self.loop.run_until_complete(t1) |
| with self.assertRaises(asyncio.CancelledError): |
| self.loop.run_until_complete(t2) |
| |
| self.loop.run_until_complete(self.loop.shutdown_asyncgens()) |
| |
| self.assertEqual(finalized, 2) |
| |
| def test_async_gen_asyncio_shutdown_02(self): |
| messages = [] |
| |
| def exception_handler(loop, context): |
| messages.append(context) |
| |
| async def async_iterate(): |
| yield 1 |
| yield 2 |
| |
| it = async_iterate() |
| async def main(): |
| loop = asyncio.get_running_loop() |
| loop.set_exception_handler(exception_handler) |
| |
| async for i in it: |
| break |
| |
| asyncio.run(main()) |
| |
| self.assertEqual(messages, []) |
| |
| def test_async_gen_asyncio_shutdown_exception_01(self): |
| messages = [] |
| |
| def exception_handler(loop, context): |
| messages.append(context) |
| |
| async def async_iterate(): |
| try: |
| yield 1 |
| yield 2 |
| finally: |
| 1/0 |
| |
| it = async_iterate() |
| async def main(): |
| loop = asyncio.get_running_loop() |
| loop.set_exception_handler(exception_handler) |
| |
| async for i in it: |
| break |
| |
| asyncio.run(main()) |
| |
| message, = messages |
| self.assertEqual(message['asyncgen'], it) |
| self.assertIsInstance(message['exception'], ZeroDivisionError) |
| self.assertIn('an error occurred during closing of asynchronous generator', |
| message['message']) |
| |
| def test_async_gen_asyncio_shutdown_exception_02(self): |
| messages = [] |
| |
| def exception_handler(loop, context): |
| messages.append(context) |
| |
| async def async_iterate(): |
| try: |
| yield 1 |
| yield 2 |
| finally: |
| 1/0 |
| |
| async def main(): |
| loop = asyncio.get_running_loop() |
| loop.set_exception_handler(exception_handler) |
| |
| async for i in async_iterate(): |
| break |
| gc_collect() |
| |
| asyncio.run(main()) |
| |
| message, = messages |
| self.assertIsInstance(message['exception'], ZeroDivisionError) |
| self.assertIn('unhandled exception during asyncio.run() shutdown', |
| message['message']) |
| |
| def test_async_gen_expression_01(self): |
| async def arange(n): |
| for i in range(n): |
| await asyncio.sleep(0.01) |
| yield i |
| |
| def make_arange(n): |
| # This syntax is legal starting with Python 3.7 |
| return (i * 2 async for i in arange(n)) |
| |
| async def run(): |
| return [i async for i in make_arange(10)] |
| |
| res = self.loop.run_until_complete(run()) |
| self.assertEqual(res, [i * 2 for i in range(10)]) |
| |
| def test_async_gen_expression_02(self): |
| async def wrap(n): |
| await asyncio.sleep(0.01) |
| return n |
| |
| def make_arange(n): |
| # This syntax is legal starting with Python 3.7 |
| return (i * 2 for i in range(n) if await wrap(i)) |
| |
| async def run(): |
| return [i async for i in make_arange(10)] |
| |
| res = self.loop.run_until_complete(run()) |
| self.assertEqual(res, [i * 2 for i in range(1, 10)]) |
| |
| def test_asyncgen_nonstarted_hooks_are_cancellable(self): |
| # See https://bugs.python.org/issue38013 |
| messages = [] |
| |
| def exception_handler(loop, context): |
| messages.append(context) |
| |
| async def async_iterate(): |
| yield 1 |
| yield 2 |
| |
| async def main(): |
| loop = asyncio.get_running_loop() |
| loop.set_exception_handler(exception_handler) |
| |
| async for i in async_iterate(): |
| break |
| |
| asyncio.run(main()) |
| |
| self.assertEqual([], messages) |
| |
| def test_async_gen_await_same_anext_coro_twice(self): |
| async def async_iterate(): |
| yield 1 |
| yield 2 |
| |
| async def run(): |
| it = async_iterate() |
| nxt = it.__anext__() |
| await nxt |
| with self.assertRaisesRegex( |
| RuntimeError, |
| r"cannot reuse already awaited __anext__\(\)/asend\(\)" |
| ): |
| await nxt |
| |
| await it.aclose() # prevent unfinished iterator warning |
| |
| self.loop.run_until_complete(run()) |
| |
| def test_async_gen_await_same_aclose_coro_twice(self): |
| async def async_iterate(): |
| yield 1 |
| yield 2 |
| |
| async def run(): |
| it = async_iterate() |
| nxt = it.aclose() |
| await nxt |
| with self.assertRaisesRegex( |
| RuntimeError, |
| r"cannot reuse already awaited aclose\(\)/athrow\(\)" |
| ): |
| await nxt |
| |
| self.loop.run_until_complete(run()) |
| |
| def test_async_gen_aclose_twice_with_different_coros(self): |
| # Regression test for https://bugs.python.org/issue39606 |
| async def async_iterate(): |
| yield 1 |
| yield 2 |
| |
| async def run(): |
| it = async_iterate() |
| await it.aclose() |
| await it.aclose() |
| |
| self.loop.run_until_complete(run()) |
| |
| def test_async_gen_aclose_after_exhaustion(self): |
| # Regression test for https://bugs.python.org/issue39606 |
| async def async_iterate(): |
| yield 1 |
| yield 2 |
| |
| async def run(): |
| it = async_iterate() |
| async for _ in it: |
| pass |
| await it.aclose() |
| |
| self.loop.run_until_complete(run()) |
| |
| def test_async_gen_aclose_compatible_with_get_stack(self): |
| async def async_generator(): |
| yield object() |
| |
| async def run(): |
| ag = async_generator() |
| asyncio.create_task(ag.aclose()) |
| tasks = asyncio.all_tasks() |
| for task in tasks: |
| # No AttributeError raised |
| task.get_stack() |
| |
| self.loop.run_until_complete(run()) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |