Update Windows Python3 prebuilt
Fusion2: http://fusion2/e1653ce5-a00b-4501-8f0f-1cb270563905
GCS path: gs://ndk-kokoro-release-artifacts/prod/ndk/python3/windows_release/4/20220830-004242
Prebuilt updated using: ndk/scripts/update_kokoro_prebuilts.py
Test: Treehugger, Kokoro presubmit
Bug: 244197859
Change-Id: I7e3cb5b8add31fd59e4085dba550c6fb436f1ed8
diff --git a/Lib/asyncio/__init__.py b/Lib/asyncio/__init__.py
index eb84bfb..200b14c 100644
--- a/Lib/asyncio/__init__.py
+++ b/Lib/asyncio/__init__.py
@@ -20,10 +20,6 @@
from .threads import *
from .transports import *
-# Exposed for _asynciomodule.c to implement now deprecated
-# Task.all_tasks() method. This function will be removed in 3.9.
-from .tasks import _all_tasks_compat # NoQA
-
__all__ = (base_events.__all__ +
coroutines.__all__ +
events.__all__ +
diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py
index b2d446a..952da11 100644
--- a/Lib/asyncio/base_events.py
+++ b/Lib/asyncio/base_events.py
@@ -49,7 +49,7 @@
from .log import logger
-__all__ = 'BaseEventLoop',
+__all__ = 'BaseEventLoop','Server',
# Minimum number of _scheduled timer handles before cleanup of
@@ -202,6 +202,11 @@
pass
+def _check_ssl_socket(sock):
+ if ssl is not None and isinstance(sock, ssl.SSLSocket):
+ raise TypeError("Socket cannot be of type SSLSocket")
+
+
class _SendfileFallbackProtocol(protocols.Protocol):
def __init__(self, transp):
if not isinstance(transp, transports._FlowControlMixin):
@@ -350,7 +355,7 @@
self._start_serving()
# Skip one loop iteration so that all 'loop.add_reader'
# go through.
- await tasks.sleep(0, loop=self._loop)
+ await tasks.sleep(0)
async def serve_forever(self):
if self._serving_forever_fut is not None:
@@ -541,8 +546,7 @@
results = await tasks.gather(
*[ag.aclose() for ag in closing_agens],
- return_exceptions=True,
- loop=self)
+ return_exceptions=True)
for result, agen in zip(results, closing_agens):
if isinstance(result, Exception):
@@ -864,6 +868,7 @@
*, fallback=True):
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
+ _check_ssl_socket(sock)
self._check_sendfile_params(sock, file, offset, count)
try:
return await self._sock_sendfile_native(sock, file,
@@ -973,7 +978,7 @@
happy_eyeballs_delay=None, interleave=None):
"""Connect to a TCP server.
- Create a streaming transport connection to a given Internet host and
+ Create a streaming transport connection to a given internet host and
port: socket family AF_INET or socket.AF_INET6 depending on host (or
family if specified), socket type SOCK_STREAM. protocol_factory must be
a callable returning a protocol instance.
@@ -1005,6 +1010,9 @@
raise ValueError(
'ssl_handshake_timeout is only meaningful with ssl')
+ if sock is not None:
+ _check_ssl_socket(sock)
+
if happy_eyeballs_delay is not None and interleave is None:
# If using happy eyeballs, default to interleave addresses by family
interleave = 1
@@ -1438,6 +1446,9 @@
raise ValueError(
'ssl_handshake_timeout is only meaningful with ssl')
+ if sock is not None:
+ _check_ssl_socket(sock)
+
if host is not None or port is not None:
if sock is not None:
raise ValueError(
@@ -1457,7 +1468,7 @@
fs = [self._create_server_getaddrinfo(host, port, family=family,
flags=flags)
for host in hosts]
- infos = await tasks.gather(*fs, loop=self)
+ infos = await tasks.gather(*fs)
infos = set(itertools.chain.from_iterable(infos))
completed = False
@@ -1515,7 +1526,7 @@
server._start_serving()
# Skip one loop iteration so that all 'loop.add_reader'
# go through.
- await tasks.sleep(0, loop=self)
+ await tasks.sleep(0)
if self._debug:
logger.info("%r is serving", server)
@@ -1525,14 +1536,6 @@
self, protocol_factory, sock,
*, ssl=None,
ssl_handshake_timeout=None):
- """Handle an accepted connection.
-
- This is used by servers that accept connections outside of
- asyncio but that use asyncio to handle connections.
-
- This method is a coroutine. When completed, the coroutine
- returns a (transport, protocol) pair.
- """
if sock.type != socket.SOCK_STREAM:
raise ValueError(f'A Stream Socket was expected, got {sock!r}')
@@ -1540,6 +1543,9 @@
raise ValueError(
'ssl_handshake_timeout is only meaningful with ssl')
+ if sock is not None:
+ _check_ssl_socket(sock)
+
transport, protocol = await self._create_connection_transport(
sock, protocol_factory, ssl, '', server_side=True,
ssl_handshake_timeout=ssl_handshake_timeout)
diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py
index 0dce87b..5ab1acc 100644
--- a/Lib/asyncio/events.py
+++ b/Lib/asyncio/events.py
@@ -258,13 +258,13 @@
"""Notification that a TimerHandle has been cancelled."""
raise NotImplementedError
- def call_soon(self, callback, *args):
- return self.call_later(0, callback, *args)
+ def call_soon(self, callback, *args, context=None):
+ return self.call_later(0, callback, *args, context=context)
- def call_later(self, delay, callback, *args):
+ def call_later(self, delay, callback, *args, context=None):
raise NotImplementedError
- def call_at(self, when, callback, *args):
+ def call_at(self, when, callback, *args, context=None):
raise NotImplementedError
def time(self):
@@ -280,7 +280,7 @@
# Methods for interacting with threads.
- def call_soon_threadsafe(self, callback, *args):
+ def call_soon_threadsafe(self, callback, *args, context=None):
raise NotImplementedError
def run_in_executor(self, executor, func, *args):
@@ -418,6 +418,20 @@
"""
raise NotImplementedError
+ async def connect_accepted_socket(
+ self, protocol_factory, sock,
+ *, ssl=None,
+ ssl_handshake_timeout=None):
+ """Handle an accepted connection.
+
+ This is used by servers that accept connections outside of
+ asyncio, but use asyncio to handle connections.
+
+ This method is a coroutine. When completed, the coroutine
+ returns a (transport, protocol) pair.
+ """
+ raise NotImplementedError
+
async def create_datagram_endpoint(self, protocol_factory,
local_addr=None, remote_addr=None, *,
family=0, proto=0, flags=0,
@@ -465,7 +479,7 @@
# The reason to accept file-like object instead of just file descriptor
# is: we need to own pipe and close it at transport finishing
# Can got complicated errors if pass f.fileno(),
- # close fd in pipe transport then close f and vise versa.
+ # close fd in pipe transport then close f and vice versa.
raise NotImplementedError
async def connect_write_pipe(self, protocol_factory, pipe):
@@ -478,7 +492,7 @@
# The reason to accept file-like object instead of just file descriptor
# is: we need to own pipe and close it at transport finishing
# Can got complicated errors if pass f.fileno(),
- # close fd in pipe transport then close f and vise versa.
+ # close fd in pipe transport then close f and vice versa.
raise NotImplementedError
async def subprocess_shell(self, protocol_factory, cmd, *,
@@ -745,9 +759,16 @@
the result of `get_event_loop_policy().get_event_loop()` call.
"""
# NOTE: this function is implemented in C (see _asynciomodule.c)
+ return _py__get_event_loop()
+
+
+def _get_event_loop(stacklevel=3):
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
+ import warnings
+ warnings.warn('There is no current event loop',
+ DeprecationWarning, stacklevel=stacklevel)
return get_event_loop_policy().get_event_loop()
@@ -777,6 +798,7 @@
_py__set_running_loop = _set_running_loop
_py_get_running_loop = get_running_loop
_py_get_event_loop = get_event_loop
+_py__get_event_loop = _get_event_loop
try:
@@ -784,7 +806,7 @@
# functions in asyncio. Pure Python implementation is
# about 4 times slower than C-accelerated.
from _asyncio import (_get_running_loop, _set_running_loop,
- get_running_loop, get_event_loop)
+ get_running_loop, get_event_loop, _get_event_loop)
except ImportError:
pass
else:
@@ -793,3 +815,4 @@
_c__set_running_loop = _set_running_loop
_c_get_running_loop = get_running_loop
_c_get_event_loop = get_event_loop
+ _c__get_event_loop = _get_event_loop
diff --git a/Lib/asyncio/futures.py b/Lib/asyncio/futures.py
index bed4da5..8e8cd87 100644
--- a/Lib/asyncio/futures.py
+++ b/Lib/asyncio/futures.py
@@ -8,6 +8,7 @@
import contextvars
import logging
import sys
+from types import GenericAlias
from . import base_futures
from . import events
@@ -76,7 +77,7 @@
the default event loop.
"""
if loop is None:
- self._loop = events.get_event_loop()
+ self._loop = events._get_event_loop()
else:
self._loop = loop
self._callbacks = []
@@ -106,8 +107,7 @@
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)
- def __class_getitem__(cls, type):
- return cls
+ __class_getitem__ = classmethod(GenericAlias)
@property
def _log_traceback(self):
@@ -115,7 +115,7 @@
@_log_traceback.setter
def _log_traceback(self, val):
- if bool(val):
+ if val:
raise ValueError('_log_traceback can only be set to False')
self.__log_traceback = False
@@ -408,7 +408,7 @@
assert isinstance(future, concurrent.futures.Future), \
f'concurrent.futures.Future is expected, got {future!r}'
if loop is None:
- loop = events.get_event_loop()
+ loop = events._get_event_loop()
new_future = loop.create_future()
_chain_future(future, new_future)
return new_future
diff --git a/Lib/asyncio/locks.py b/Lib/asyncio/locks.py
index f1ce732..4fef64e 100644
--- a/Lib/asyncio/locks.py
+++ b/Lib/asyncio/locks.py
@@ -3,10 +3,9 @@
__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
import collections
-import warnings
-from . import events
from . import exceptions
+from . import mixins
class _ContextManagerMixin:
@@ -20,7 +19,7 @@
self.release()
-class Lock(_ContextManagerMixin):
+class Lock(_ContextManagerMixin, mixins._LoopBoundMixin):
"""Primitive lock objects.
A primitive lock is a synchronization primitive that is not owned
@@ -74,16 +73,10 @@
"""
- def __init__(self, *, loop=None):
+ def __init__(self, *, loop=mixins._marker):
+ super().__init__(loop=loop)
self._waiters = None
self._locked = False
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
def __repr__(self):
res = super().__repr__()
@@ -109,7 +102,7 @@
if self._waiters is None:
self._waiters = collections.deque()
- fut = self._loop.create_future()
+ fut = self._get_loop().create_future()
self._waiters.append(fut)
# Finally block should be called before the CancelledError
@@ -161,7 +154,7 @@
fut.set_result(True)
-class Event:
+class Event(mixins._LoopBoundMixin):
"""Asynchronous equivalent to threading.Event.
Class implementing event objects. An event manages a flag that can be set
@@ -170,16 +163,10 @@
false.
"""
- def __init__(self, *, loop=None):
+ def __init__(self, *, loop=mixins._marker):
+ super().__init__(loop=loop)
self._waiters = collections.deque()
self._value = False
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
def __repr__(self):
res = super().__repr__()
@@ -220,7 +207,7 @@
if self._value:
return True
- fut = self._loop.create_future()
+ fut = self._get_loop().create_future()
self._waiters.append(fut)
try:
await fut
@@ -229,7 +216,7 @@
self._waiters.remove(fut)
-class Condition(_ContextManagerMixin):
+class Condition(_ContextManagerMixin, mixins._LoopBoundMixin):
"""Asynchronous equivalent to threading.Condition.
This class implements condition variable objects. A condition variable
@@ -239,19 +226,10 @@
A new Lock object is created and used as the underlying lock.
"""
- def __init__(self, lock=None, *, loop=None):
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
+ def __init__(self, lock=None, *, loop=mixins._marker):
+ super().__init__(loop=loop)
if lock is None:
- lock = Lock(loop=loop)
- elif lock._loop is not self._loop:
- raise ValueError("loop argument must agree with lock")
+ lock = Lock()
self._lock = lock
# Export the lock's locked(), acquire() and release() methods.
@@ -284,7 +262,7 @@
self.release()
try:
- fut = self._loop.create_future()
+ fut = self._get_loop().create_future()
self._waiters.append(fut)
try:
await fut
@@ -351,7 +329,7 @@
self.notify(len(self._waiters))
-class Semaphore(_ContextManagerMixin):
+class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
"""A Semaphore implementation.
A semaphore manages an internal counter which is decremented by each
@@ -366,18 +344,12 @@
ValueError is raised.
"""
- def __init__(self, value=1, *, loop=None):
+ def __init__(self, value=1, *, loop=mixins._marker):
+ super().__init__(loop=loop)
if value < 0:
raise ValueError("Semaphore initial value must be >= 0")
self._value = value
self._waiters = collections.deque()
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
def __repr__(self):
res = super().__repr__()
@@ -407,7 +379,7 @@
True.
"""
while self._value <= 0:
- fut = self._loop.create_future()
+ fut = self._get_loop().create_future()
self._waiters.append(fut)
try:
await fut
@@ -436,12 +408,7 @@
above the initial value.
"""
- def __init__(self, value=1, *, loop=None):
- if loop:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
+ def __init__(self, value=1, *, loop=mixins._marker):
self._bound_value = value
super().__init__(value, loop=loop)
diff --git a/Lib/asyncio/mixins.py b/Lib/asyncio/mixins.py
new file mode 100644
index 0000000..650df05
--- /dev/null
+++ b/Lib/asyncio/mixins.py
@@ -0,0 +1,31 @@
+"""Event loop mixins."""
+
+import threading
+from . import events
+
+_global_lock = threading.Lock()
+
+# Used as a sentinel for loop parameter
+_marker = object()
+
+
+class _LoopBoundMixin:
+ _loop = None
+
+ def __init__(self, *, loop=_marker):
+ if loop is not _marker:
+ raise TypeError(
+ f'As of 3.10, the *loop* parameter was removed from '
+ f'{type(self).__name__}() since it is no longer necessary'
+ )
+
+ def _get_loop(self):
+ loop = events._get_running_loop()
+
+ if self._loop is None:
+ with _global_lock:
+ if self._loop is None:
+ self._loop = loop
+ if loop is not self._loop:
+ raise RuntimeError(f'{self!r} is bound to a different event loop')
+ return loop
diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py
index b4cd414..411685b 100644
--- a/Lib/asyncio/proactor_events.py
+++ b/Lib/asyncio/proactor_events.py
@@ -158,7 +158,7 @@
# end then it may fail with ERROR_NETNAME_DELETED if we
# just close our end. First calling shutdown() seems to
# cure it, but maybe using DisconnectEx() would be better.
- if hasattr(self._sock, 'shutdown'):
+ if hasattr(self._sock, 'shutdown') and self._sock.fileno() != -1:
self._sock.shutdown(socket.SHUT_RDWR)
self._sock.close()
self._sock = None
@@ -179,11 +179,12 @@
"""Transport for read pipes."""
def __init__(self, loop, sock, protocol, waiter=None,
- extra=None, server=None):
- self._pending_data = None
+ extra=None, server=None, buffer_size=65536):
+ self._pending_data_length = -1
self._paused = True
super().__init__(loop, sock, protocol, waiter, extra, server)
+ self._data = bytearray(buffer_size)
self._loop.call_soon(self._loop_reading)
self._paused = False
@@ -217,12 +218,12 @@
if self._read_fut is None:
self._loop.call_soon(self._loop_reading, None)
- data = self._pending_data
- self._pending_data = None
- if data is not None:
+ length = self._pending_data_length
+ self._pending_data_length = -1
+ if length > -1:
# Call the protocol methode after calling _loop_reading(),
# since the protocol can decide to pause reading again.
- self._loop.call_soon(self._data_received, data)
+ self._loop.call_soon(self._data_received, self._data[:length], length)
if self._loop.get_debug():
logger.debug("%r resumes reading", self)
@@ -243,15 +244,15 @@
if not keep_open:
self.close()
- def _data_received(self, data):
+ def _data_received(self, data, length):
if self._paused:
# Don't call any protocol method while reading is paused.
# The protocol will be called on resume_reading().
- assert self._pending_data is None
- self._pending_data = data
+ assert self._pending_data_length == -1
+ self._pending_data_length = length
return
- if not data:
+ if length == 0:
self._eof_received()
return
@@ -269,6 +270,7 @@
self._protocol.data_received(data)
def _loop_reading(self, fut=None):
+ length = -1
data = None
try:
if fut is not None:
@@ -277,18 +279,18 @@
self._read_fut = None
if fut.done():
# deliver data later in "finally" clause
- data = fut.result()
+ length = fut.result()
+ if length == 0:
+ # we got end-of-file so no need to reschedule a new read
+ return
+
+ data = self._data[:length]
else:
# the future will be replaced by next proactor.recv call
fut.cancel()
if self._closing:
# since close() has been called we ignore any read data
- data = None
- return
-
- if data == b'':
- # we got end-of-file so no need to reschedule a new read
return
# bpo-33694: buffer_updated() has currently no fast path because of
@@ -296,7 +298,7 @@
if not self._paused:
# reschedule a new read
- self._read_fut = self._loop._proactor.recv(self._sock, 32768)
+ self._read_fut = self._loop._proactor.recv_into(self._sock, self._data)
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
@@ -314,8 +316,8 @@
if not self._paused:
self._read_fut.add_done_callback(self._loop_reading)
finally:
- if data is not None:
- self._data_received(data)
+ if length > -1:
+ self._data_received(data, length)
class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
@@ -450,7 +452,8 @@
self.close()
-class _ProactorDatagramTransport(_ProactorBasePipeTransport):
+class _ProactorDatagramTransport(_ProactorBasePipeTransport,
+ transports.DatagramTransport):
max_size = 256 * 1024
def __init__(self, loop, sock, protocol, address=None,
waiter=None, extra=None):
diff --git a/Lib/asyncio/protocols.py b/Lib/asyncio/protocols.py
index 69fa43e..09987b1 100644
--- a/Lib/asyncio/protocols.py
+++ b/Lib/asyncio/protocols.py
@@ -109,10 +109,6 @@
class BufferedProtocol(BaseProtocol):
"""Interface for stream protocol with manual buffer control.
- Important: this has been added to asyncio in Python 3.7
- *on a provisional basis*! Consider it as an experimental API that
- might be changed or removed in Python 3.8.
-
Event methods, such as `create_server` and `create_connection`,
accept factories that return protocols that implement this interface.
diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py
index cd3f7c6..10dd689 100644
--- a/Lib/asyncio/queues.py
+++ b/Lib/asyncio/queues.py
@@ -2,10 +2,10 @@
import collections
import heapq
-import warnings
+from types import GenericAlias
-from . import events
from . import locks
+from . import mixins
class QueueEmpty(Exception):
@@ -18,7 +18,7 @@
pass
-class Queue:
+class Queue(mixins._LoopBoundMixin):
"""A queue, useful for coordinating producer and consumer coroutines.
If maxsize is less than or equal to zero, the queue size is infinite. If it
@@ -30,14 +30,8 @@
interrupted between calling qsize() and doing an operation on the Queue.
"""
- def __init__(self, maxsize=0, *, loop=None):
- if loop is None:
- self._loop = events.get_event_loop()
- else:
- self._loop = loop
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ def __init__(self, maxsize=0, *, loop=mixins._marker):
+ super().__init__(loop=loop)
self._maxsize = maxsize
# Futures.
@@ -45,7 +39,7 @@
# Futures.
self._putters = collections.deque()
self._unfinished_tasks = 0
- self._finished = locks.Event(loop=loop)
+ self._finished = locks.Event()
self._finished.set()
self._init(maxsize)
@@ -76,8 +70,7 @@
def __str__(self):
return f'<{type(self).__name__} {self._format()}>'
- def __class_getitem__(cls, type):
- return cls
+ __class_getitem__ = classmethod(GenericAlias)
def _format(self):
result = f'maxsize={self._maxsize!r}'
@@ -122,7 +115,7 @@
slot is available before adding item.
"""
while self.full():
- putter = self._loop.create_future()
+ putter = self._get_loop().create_future()
self._putters.append(putter)
try:
await putter
@@ -160,7 +153,7 @@
If queue is empty, wait until an item is available.
"""
while self.empty():
- getter = self._loop.create_future()
+ getter = self._get_loop().create_future()
self._getters.append(getter)
try:
await getter
diff --git a/Lib/asyncio/runners.py b/Lib/asyncio/runners.py
index 268635d..9a5e9a4 100644
--- a/Lib/asyncio/runners.py
+++ b/Lib/asyncio/runners.py
@@ -60,8 +60,7 @@
for task in to_cancel:
task.cancel()
- loop.run_until_complete(
- tasks.gather(*to_cancel, loop=loop, return_exceptions=True))
+ loop.run_until_complete(tasks.gather(*to_cancel, return_exceptions=True))
for task in to_cancel:
if task.cancelled():
diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py
index 59cb6b1..71080b8 100644
--- a/Lib/asyncio/selector_events.py
+++ b/Lib/asyncio/selector_events.py
@@ -40,11 +40,6 @@
return bool(key.events & event)
-def _check_ssl_socket(sock):
- if ssl is not None and isinstance(sock, ssl.SSLSocket):
- raise TypeError("Socket cannot be of type SSLSocket")
-
-
class BaseSelectorEventLoop(base_events.BaseEventLoop):
"""Selector event loop.
@@ -357,7 +352,7 @@
The maximum amount of data to be received at once is specified by
nbytes.
"""
- _check_ssl_socket(sock)
+ base_events._check_ssl_socket(sock)
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
try:
@@ -398,7 +393,7 @@
The received data is written into *buf* (a writable buffer).
The return value is the number of bytes written.
"""
- _check_ssl_socket(sock)
+ base_events._check_ssl_socket(sock)
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
try:
@@ -439,7 +434,7 @@
raised, and there is no way to determine how much data, if any, was
successfully processed by the receiving end of the connection.
"""
- _check_ssl_socket(sock)
+ base_events._check_ssl_socket(sock)
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
try:
@@ -488,13 +483,15 @@
This method is a coroutine.
"""
- _check_ssl_socket(sock)
+ base_events._check_ssl_socket(sock)
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
if not hasattr(socket, 'AF_UNIX') or sock.family != socket.AF_UNIX:
resolved = await self._ensure_resolved(
- address, family=sock.family, proto=sock.proto, loop=self)
+ address, family=sock.family, type=sock.type, proto=sock.proto,
+ loop=self,
+ )
_, _, _, _, address = resolved[0]
fut = self.create_future()
@@ -553,7 +550,7 @@
object usable to send and receive data on the connection, and address
is the address bound to the socket on the other end of the connection.
"""
- _check_ssl_socket(sock)
+ base_events._check_ssl_socket(sock)
if self._debug and sock.gettimeout() != 0:
raise ValueError("the socket must be non-blocking")
fut = self.create_future()
diff --git a/Lib/asyncio/sslproto.py b/Lib/asyncio/sslproto.py
index cad25b2..00fc16c 100644
--- a/Lib/asyncio/sslproto.py
+++ b/Lib/asyncio/sslproto.py
@@ -367,6 +367,12 @@
"""Return the current size of the write buffer."""
return self._ssl_protocol._transport.get_write_buffer_size()
+ def get_write_buffer_limits(self):
+ """Get the high and low watermarks for write flow control.
+ Return a tuple (low, high) where low and high are
+ positive number of bytes."""
+ return self._ssl_protocol._transport.get_write_buffer_limits()
+
@property
def _protocol_paused(self):
# Required for sendfile fallback pause_writing/resume_writing logic
diff --git a/Lib/asyncio/streams.py b/Lib/asyncio/streams.py
index 3c80bb8..080d8a6 100644
--- a/Lib/asyncio/streams.py
+++ b/Lib/asyncio/streams.py
@@ -23,7 +23,7 @@
async def open_connection(host=None, port=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ limit=_DEFAULT_LIMIT, **kwds):
"""A wrapper for create_connection() returning a (reader, writer) pair.
The reader returned is a StreamReader instance; the writer is a
@@ -41,12 +41,7 @@
StreamReaderProtocol classes, just copy the code -- there's
really nothing special here except some convenience.)
"""
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_connection(
@@ -56,7 +51,7 @@
async def start_server(client_connected_cb, host=None, port=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ limit=_DEFAULT_LIMIT, **kwds):
"""Start a socket server, call back for each client connected.
The first parameter, `client_connected_cb`, takes two parameters:
@@ -78,12 +73,7 @@
The return value is the same as loop.create_server(), i.e. a
Server object which can be used to stop the service.
"""
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
def factory():
reader = StreamReader(limit=limit, loop=loop)
@@ -98,14 +88,10 @@
# UNIX Domain Sockets are supported on this platform
async def open_unix_connection(path=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `open_connection` but works with UNIX Domain Sockets."""
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
+
reader = StreamReader(limit=limit, loop=loop)
protocol = StreamReaderProtocol(reader, loop=loop)
transport, _ = await loop.create_unix_connection(
@@ -114,14 +100,9 @@
return reader, writer
async def start_unix_server(client_connected_cb, path=None, *,
- loop=None, limit=_DEFAULT_LIMIT, **kwds):
+ limit=_DEFAULT_LIMIT, **kwds):
"""Similar to `start_server` but works with UNIX Domain Sockets."""
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
def factory():
reader = StreamReader(limit=limit, loop=loop)
@@ -144,7 +125,7 @@
def __init__(self, loop=None):
if loop is None:
- self._loop = events.get_event_loop()
+ self._loop = events._get_event_loop(stacklevel=4)
else:
self._loop = loop
self._paused = False
@@ -302,9 +283,13 @@
def __del__(self):
# Prevent reports about unhandled exceptions.
# Better than self._closed._log_traceback = False hack
- closed = self._closed
- if closed.done() and not closed.cancelled():
- closed.exception()
+ try:
+ closed = self._closed
+ except AttributeError:
+ pass # failed constructor
+ else:
+ if closed.done() and not closed.cancelled():
+ closed.exception()
class StreamWriter:
@@ -400,7 +385,7 @@
self._limit = limit
if loop is None:
- self._loop = events.get_event_loop()
+ self._loop = events._get_event_loop()
else:
self._loop = loop
self._buffer = bytearray()
diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py
index c9506b1..cd10231 100644
--- a/Lib/asyncio/subprocess.py
+++ b/Lib/asyncio/subprocess.py
@@ -1,7 +1,6 @@
__all__ = 'create_subprocess_exec', 'create_subprocess_shell'
import subprocess
-import warnings
from . import events
from . import protocols
@@ -193,24 +192,14 @@
stderr = self._read_stream(2)
else:
stderr = self._noop()
- stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr,
- loop=self._loop)
+ stdin, stdout, stderr = await tasks.gather(stdin, stdout, stderr)
await self.wait()
return (stdout, stderr)
async def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
- loop=None, limit=streams._DEFAULT_LIMIT,
- **kwds):
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8 "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning,
- stacklevel=2
- )
-
+ limit=streams._DEFAULT_LIMIT, **kwds):
+ loop = events.get_running_loop()
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop)
transport, protocol = await loop.subprocess_shell(
@@ -221,16 +210,9 @@
async def create_subprocess_exec(program, *args, stdin=None, stdout=None,
- stderr=None, loop=None,
- limit=streams._DEFAULT_LIMIT, **kwds):
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8 "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning,
- stacklevel=2
- )
+ stderr=None, limit=streams._DEFAULT_LIMIT,
+ **kwds):
+ loop = events.get_running_loop()
protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
loop=loop)
transport, protocol = await loop.subprocess_exec(
diff --git a/Lib/asyncio/tasks.py b/Lib/asyncio/tasks.py
index f486b67..c4bedb5 100644
--- a/Lib/asyncio/tasks.py
+++ b/Lib/asyncio/tasks.py
@@ -17,6 +17,7 @@
import types
import warnings
import weakref
+from types import GenericAlias
from . import base_tasks
from . import coroutines
@@ -61,30 +62,6 @@
if futures._get_loop(t) is loop and not t.done()}
-def _all_tasks_compat(loop=None):
- # Different from "all_task()" by returning *all* Tasks, including
- # the completed ones. Used to implement deprecated "Tasks.all_task()"
- # method.
- if loop is None:
- loop = events.get_event_loop()
- # Looping over a WeakSet (_all_tasks) isn't safe as it can be updated from another
- # thread while we do so. Therefore we cast it to list prior to filtering. The list
- # cast itself requires iteration, so we repeat it several times ignoring
- # RuntimeErrors (which are not very likely to occur). See issues 34970 and 36607 for
- # details.
- i = 0
- while True:
- try:
- tasks = list(_all_tasks)
- except RuntimeError:
- i += 1
- if i >= 1000:
- raise
- else:
- break
- return {t for t in tasks if futures._get_loop(t) is loop}
-
-
def _set_task_name(task, name):
if name is not None:
try:
@@ -147,8 +124,7 @@
self._loop.call_exception_handler(context)
super().__del__()
- def __class_getitem__(cls, type):
- return cls
+ __class_getitem__ = classmethod(GenericAlias)
def _repr_info(self):
return base_tasks._task_repr_info(self)
@@ -370,7 +346,7 @@
ALL_COMPLETED = concurrent.futures.ALL_COMPLETED
-async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
+async def wait(fs, *, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the Futures and coroutines given by fs to complete.
The fs iterable must not be empty.
@@ -393,12 +369,7 @@
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')
- if loop is None:
- loop = events.get_running_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
fs = set(fs)
@@ -418,7 +389,7 @@
waiter.set_result(None)
-async def wait_for(fut, timeout, *, loop=None):
+async def wait_for(fut, timeout):
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
@@ -431,12 +402,7 @@
This function is a coroutine.
"""
- if loop is None:
- loop = events.get_running_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events.get_running_loop()
if timeout is None:
return await fut
@@ -449,11 +415,9 @@
await _cancel_and_wait(fut, loop=loop)
try:
- fut.result()
+ return fut.result()
except exceptions.CancelledError as exc:
raise exceptions.TimeoutError() from exc
- else:
- raise exceptions.TimeoutError()
waiter = loop.create_future()
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
@@ -471,7 +435,10 @@
return fut.result()
else:
fut.remove_done_callback(cb)
- fut.cancel()
+ # We must ensure that the task is not running
+ # after wait_for() returns.
+ # See https://bugs.python.org/issue32751
+ await _cancel_and_wait(fut, loop=loop)
raise
if fut.done():
@@ -486,11 +453,9 @@
# exception, we should re-raise it
# See https://bugs.python.org/issue40607
try:
- fut.result()
+ return fut.result()
except exceptions.CancelledError as exc:
raise exceptions.TimeoutError() from exc
- else:
- raise exceptions.TimeoutError()
finally:
timeout_handle.cancel()
@@ -556,7 +521,7 @@
# This is *not* a @coroutine! It is just an iterator (yielding Futures).
-def as_completed(fs, *, loop=None, timeout=None):
+def as_completed(fs, *, timeout=None):
"""Return an iterator whose values are coroutines.
When waiting for the yielded coroutines you'll get the results (or
@@ -578,14 +543,9 @@
raise TypeError(f"expect an iterable of futures, not {type(fs).__name__}")
from .queues import Queue # Import here to avoid circular import problem.
- done = Queue(loop=loop)
+ done = Queue()
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events._get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
timeout_handle = None
@@ -630,19 +590,13 @@
yield
-async def sleep(delay, result=None, *, loop=None):
+async def sleep(delay, result=None):
"""Coroutine that completes after a given time (in seconds)."""
if delay <= 0:
await __sleep0()
return result
- if loop is None:
- loop = events.get_running_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
-
+ loop = events.get_running_loop()
future = loop.create_future()
h = loop.call_later(delay,
futures._set_result_unless_cancelled,
@@ -658,23 +612,32 @@
If the argument is a Future, it is returned directly.
"""
- if coroutines.iscoroutine(coro_or_future):
- if loop is None:
- loop = events.get_event_loop()
- task = loop.create_task(coro_or_future)
- if task._source_traceback:
- del task._source_traceback[-1]
- return task
- elif futures.isfuture(coro_or_future):
+ return _ensure_future(coro_or_future, loop=loop)
+
+
+def _ensure_future(coro_or_future, *, loop=None):
+ if futures.isfuture(coro_or_future):
if loop is not None and loop is not futures._get_loop(coro_or_future):
raise ValueError('The future belongs to a different loop than '
- 'the one specified as the loop argument')
+ 'the one specified as the loop argument')
return coro_or_future
- elif inspect.isawaitable(coro_or_future):
- return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
- else:
- raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
- 'required')
+ called_wrap_awaitable = False
+ if not coroutines.iscoroutine(coro_or_future):
+ if inspect.isawaitable(coro_or_future):
+ coro_or_future = _wrap_awaitable(coro_or_future)
+ called_wrap_awaitable = True
+ else:
+ raise TypeError('An asyncio.Future, a coroutine or an awaitable '
+ 'is required')
+
+ if loop is None:
+ loop = events._get_event_loop(stacklevel=4)
+ try:
+ return loop.create_task(coro_or_future)
+ except RuntimeError:
+ if not called_wrap_awaitable:
+ coro_or_future.close()
+ raise
@types.coroutine
@@ -697,7 +660,8 @@
cancelled.
"""
- def __init__(self, children, *, loop=None):
+ def __init__(self, children, *, loop):
+ assert loop is not None
super().__init__(loop=loop)
self._children = children
self._cancel_requested = False
@@ -717,7 +681,7 @@
return ret
-def gather(*coros_or_futures, loop=None, return_exceptions=False):
+def gather(*coros_or_futures, return_exceptions=False):
"""Return a future aggregating results from the given coroutines/futures.
Coroutines will be wrapped in a future and scheduled in the event
@@ -748,12 +712,7 @@
gather won't cancel any other awaitables.
"""
if not coros_or_futures:
- if loop is None:
- loop = events.get_event_loop()
- else:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
+ loop = events._get_event_loop()
outer = loop.create_future()
outer.set_result([])
return outer
@@ -762,7 +721,7 @@
nonlocal nfinished
nfinished += 1
- if outer.done():
+ if outer is None or outer.done():
if not fut.cancelled():
# Mark exception retrieved.
fut.exception()
@@ -817,9 +776,11 @@
children = []
nfuts = 0
nfinished = 0
+ loop = None
+ outer = None # bpo-46672
for arg in coros_or_futures:
if arg not in arg_to_fut:
- fut = ensure_future(arg, loop=loop)
+ fut = _ensure_future(arg, loop=loop)
if loop is None:
loop = futures._get_loop(fut)
if fut is not arg:
@@ -843,7 +804,7 @@
return outer
-def shield(arg, *, loop=None):
+def shield(arg):
"""Wait for a future, shielding it from cancellation.
The statement
@@ -869,11 +830,7 @@
except CancelledError:
res = None
"""
- if loop is not None:
- warnings.warn("The loop argument is deprecated since Python 3.8, "
- "and scheduled for removal in Python 3.10.",
- DeprecationWarning, stacklevel=2)
- inner = ensure_future(arg, loop=loop)
+ inner = _ensure_future(arg)
if inner.done():
# Shortcut.
return inner
diff --git a/Lib/asyncio/threads.py b/Lib/asyncio/threads.py
index 34b7513..db048a8 100644
--- a/Lib/asyncio/threads.py
+++ b/Lib/asyncio/threads.py
@@ -13,7 +13,7 @@
"""Asynchronously run function *func* in a separate thread.
Any *args and **kwargs supplied for this function are directly passed
- to *func*. Also, the current :class:`contextvars.Context` is propogated,
+ to *func*. Also, the current :class:`contextvars.Context` is propagated,
allowing context variables from the main thread to be accessed in the
separate thread.
diff --git a/Lib/asyncio/transports.py b/Lib/asyncio/transports.py
index 45e155c..73b1fa2 100644
--- a/Lib/asyncio/transports.py
+++ b/Lib/asyncio/transports.py
@@ -99,6 +99,12 @@
"""Return the current size of the write buffer."""
raise NotImplementedError
+ def get_write_buffer_limits(self):
+ """Get the high and low watermarks for write flow control.
+ Return a tuple (low, high) where low and high are
+ positive number of bytes."""
+ raise NotImplementedError
+
def write(self, data):
"""Write some data bytes to the transport.
diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py
index f34a5b4..c88b818 100644
--- a/Lib/asyncio/unix_events.py
+++ b/Lib/asyncio/unix_events.py
@@ -44,6 +44,16 @@
pass
+def waitstatus_to_exitcode(status):
+ try:
+ return os.waitstatus_to_exitcode(status)
+ except ValueError:
+ # The child exited, but we don't understand its status.
+ # This shouldn't happen, but if it does, let's just
+ # return that status; perhaps that helps debug it.
+ return status
+
+
class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
"""Unix event loop.
@@ -323,7 +333,7 @@
server._start_serving()
# Skip one loop iteration so that all 'loop.add_reader'
# go through.
- await tasks.sleep(0, loop=self)
+ await tasks.sleep(0)
return server
@@ -941,7 +951,7 @@
" will report returncode 255",
pid)
else:
- returncode = _compute_returncode(status)
+ returncode = waitstatus_to_exitcode(status)
os.close(pidfd)
callback(pid, returncode, *args)
@@ -956,20 +966,6 @@
return True
-def _compute_returncode(status):
- if os.WIFSIGNALED(status):
- # The child process died because of a signal.
- return -os.WTERMSIG(status)
- elif os.WIFEXITED(status):
- # The child process exited (e.g sys.exit()).
- return os.WEXITSTATUS(status)
- else:
- # The child exited, but we don't understand its status.
- # This shouldn't happen, but if it does, let's just
- # return that status; perhaps that helps debug it.
- return status
-
-
class BaseChildWatcher(AbstractChildWatcher):
def __init__(self):
@@ -1080,7 +1076,7 @@
# The child process is still alive.
return
- returncode = _compute_returncode(status)
+ returncode = waitstatus_to_exitcode(status)
if self._loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)
@@ -1173,7 +1169,7 @@
# A child process is still alive.
return
- returncode = _compute_returncode(status)
+ returncode = waitstatus_to_exitcode(status)
with self._lock:
try:
@@ -1230,13 +1226,15 @@
def close(self):
self._callbacks.clear()
- if self._saved_sighandler is not None:
- handler = signal.getsignal(signal.SIGCHLD)
- if handler != self._sig_chld:
- logger.warning("SIGCHLD handler was changed by outside code")
- else:
- signal.signal(signal.SIGCHLD, self._saved_sighandler)
- self._saved_sighandler = None
+ if self._saved_sighandler is None:
+ return
+
+ handler = signal.getsignal(signal.SIGCHLD)
+ if handler != self._sig_chld:
+ logger.warning("SIGCHLD handler was changed by outside code")
+ else:
+ signal.signal(signal.SIGCHLD, self._saved_sighandler)
+ self._saved_sighandler = None
def __enter__(self):
return self
@@ -1263,15 +1261,17 @@
# The reason to do it here is that attach_loop() is called from
# unix policy only for the main thread.
# Main thread is required for subscription on SIGCHLD signal
- if self._saved_sighandler is None:
- self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
- if self._saved_sighandler is None:
- logger.warning("Previous SIGCHLD handler was set by non-Python code, "
- "restore to default handler on watcher close.")
- self._saved_sighandler = signal.SIG_DFL
+ if self._saved_sighandler is not None:
+ return
- # Set SA_RESTART to limit EINTR occurrences.
- signal.siginterrupt(signal.SIGCHLD, False)
+ self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
+ if self._saved_sighandler is None:
+ logger.warning("Previous SIGCHLD handler was set by non-Python code, "
+ "restore to default handler on watcher close.")
+ self._saved_sighandler = signal.SIG_DFL
+
+ # Set SA_RESTART to limit EINTR occurrences.
+ signal.siginterrupt(signal.SIGCHLD, False)
def _do_waitpid_all(self):
for pid in list(self._callbacks):
@@ -1296,7 +1296,7 @@
# The child process is still alive.
return
- returncode = _compute_returncode(status)
+ returncode = waitstatus_to_exitcode(status)
debug_log = True
try:
loop, callback, args = self._callbacks.pop(pid)
@@ -1379,7 +1379,7 @@
def remove_child_handler(self, pid):
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
- # abstract base classe requires it
+ # abstract base classes require it.
return True
def attach_loop(self, loop):
@@ -1399,7 +1399,7 @@
"Unknown child process pid %d, will report returncode 255",
pid)
else:
- returncode = _compute_returncode(status)
+ returncode = waitstatus_to_exitcode(status)
if loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)
diff --git a/Lib/asyncio/windows_events.py b/Lib/asyncio/windows_events.py
index 5e7cd79..da81ab4 100644
--- a/Lib/asyncio/windows_events.py
+++ b/Lib/asyncio/windows_events.py
@@ -1,5 +1,10 @@
"""Selector and proactor event loops for Windows."""
+import sys
+
+if sys.platform != 'win32': # pragma: no cover
+ raise ImportError('win32 only')
+
import _overlapped
import _winapi
import errno