feat: First batch of AIO integration (#26)
This change includes:
* Nox configuration support for AsynciO unit tests
* No pre release gRPC Python required
* AsyncIO retry module
* AsyncIO config parsing module
* Exception parsing patch
* Corresponding unit test cases
diff --git a/docs/retry.rst b/docs/retry.rst
index 23a7d70..97a7f2c 100644
--- a/docs/retry.rst
+++ b/docs/retry.rst
@@ -4,3 +4,10 @@
.. automodule:: google.api_core.retry
:members:
:show-inheritance:
+
+Retry in AsyncIO
+----------------
+
+.. automodule:: google.api_core.retry_async
+ :members:
+ :show-inheritance:
diff --git a/google/api_core/exceptions.py b/google/api_core/exceptions.py
index eed4ee4..d1459ab 100644
--- a/google/api_core/exceptions.py
+++ b/google/api_core/exceptions.py
@@ -444,6 +444,10 @@
return error
+def _is_informative_grpc_error(rpc_exc):
+ return hasattr(rpc_exc, "code") and hasattr(rpc_exc, "details")
+
+
def from_grpc_error(rpc_exc):
"""Create a :class:`GoogleAPICallError` from a :class:`grpc.RpcError`.
@@ -454,7 +458,9 @@
GoogleAPICallError: An instance of the appropriate subclass of
:class:`GoogleAPICallError`.
"""
- if isinstance(rpc_exc, grpc.Call):
+ # NOTE(lidiz) All gRPC error shares the parent class grpc.RpcError.
+ # However, check for grpc.RpcError breaks backward compatibility.
+ if isinstance(rpc_exc, grpc.Call) or _is_informative_grpc_error(rpc_exc):
return from_grpc_status(
rpc_exc.code(), rpc_exc.details(), errors=(rpc_exc,), response=rpc_exc
)
diff --git a/google/api_core/gapic_v1/__init__.py b/google/api_core/gapic_v1/__init__.py
index e7a7a68..e47d2cb 100644
--- a/google/api_core/gapic_v1/__init__.py
+++ b/google/api_core/gapic_v1/__init__.py
@@ -12,9 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import sys
+
from google.api_core.gapic_v1 import client_info
from google.api_core.gapic_v1 import config
from google.api_core.gapic_v1 import method
from google.api_core.gapic_v1 import routing_header
__all__ = ["client_info", "config", "method", "routing_header"]
+
+if sys.version_info >= (3, 6):
+ from google.api_core.gapic_v1 import config_async # noqa: F401
+ __all__.append("config_async")
diff --git a/google/api_core/gapic_v1/config.py b/google/api_core/gapic_v1/config.py
index 3a3eb15..2a56cf1 100644
--- a/google/api_core/gapic_v1/config.py
+++ b/google/api_core/gapic_v1/config.py
@@ -45,7 +45,7 @@
return exceptions.exception_class_for_grpc_status(getattr(grpc.StatusCode, name))
-def _retry_from_retry_config(retry_params, retry_codes):
+def _retry_from_retry_config(retry_params, retry_codes, retry_impl=retry.Retry):
"""Creates a Retry object given a gapic retry configuration.
Args:
@@ -70,7 +70,7 @@
exception_classes = [
_exception_class_for_grpc_status_name(code) for code in retry_codes
]
- return retry.Retry(
+ return retry_impl(
retry.if_exception_type(*exception_classes),
initial=(retry_params["initial_retry_delay_millis"] / _MILLIS_PER_SECOND),
maximum=(retry_params["max_retry_delay_millis"] / _MILLIS_PER_SECOND),
@@ -110,7 +110,7 @@
MethodConfig = collections.namedtuple("MethodConfig", ["retry", "timeout"])
-def parse_method_configs(interface_config):
+def parse_method_configs(interface_config, retry_impl=retry.Retry):
"""Creates default retry and timeout objects for each method in a gapic
interface config.
@@ -120,6 +120,8 @@
an interface named ``google.example.v1.ExampleService`` you would
pass in just that interface's configuration, for example
``gapic_config['interfaces']['google.example.v1.ExampleService']``.
+ retry_impl (Callable): The constructor that creates a retry decorator
+ that will be applied to the method based on method configs.
Returns:
Mapping[str, MethodConfig]: A mapping of RPC method names to their
@@ -151,7 +153,7 @@
if retry_params_name is not None:
retry_params = retry_params_map[retry_params_name]
retry_ = _retry_from_retry_config(
- retry_params, retry_codes_map[method_params["retry_codes_name"]]
+ retry_params, retry_codes_map[method_params["retry_codes_name"]], retry_impl
)
timeout_ = _timeout_from_retry_config(retry_params)
diff --git a/google/api_core/gapic_v1/config_async.py b/google/api_core/gapic_v1/config_async.py
new file mode 100644
index 0000000..00e5e24
--- /dev/null
+++ b/google/api_core/gapic_v1/config_async.py
@@ -0,0 +1,42 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""AsyncIO helpers for loading gapic configuration data.
+
+The Google API generator creates supplementary configuration for each RPC
+method to tell the client library how to deal with retries and timeouts.
+"""
+
+from google.api_core import retry_async
+from google.api_core.gapic_v1 import config
+from google.api_core.gapic_v1.config import MethodConfig # noqa: F401
+
+
+def parse_method_configs(interface_config):
+ """Creates default retry and timeout objects for each method in a gapic
+ interface config with AsyncIO semantics.
+
+ Args:
+ interface_config (Mapping): The interface config section of the full
+ gapic library config. For example, If the full configuration has
+ an interface named ``google.example.v1.ExampleService`` you would
+ pass in just that interface's configuration, for example
+ ``gapic_config['interfaces']['google.example.v1.ExampleService']``.
+
+ Returns:
+ Mapping[str, MethodConfig]: A mapping of RPC method names to their
+ configuration.
+ """
+ return config.parse_method_configs(
+ interface_config,
+ retry_impl=retry_async.AsyncRetry)
diff --git a/google/api_core/retry_async.py b/google/api_core/retry_async.py
new file mode 100644
index 0000000..f925c3d
--- /dev/null
+++ b/google/api_core/retry_async.py
@@ -0,0 +1,282 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""Helpers for retrying coroutine functions with exponential back-off.
+
+The :class:`AsyncRetry` decorator shares most functionality and behavior with
+:class:`Retry`, but supports coroutine functions. Please refer to description
+of :class:`Retry` for more details.
+
+By default, this decorator will retry transient
+API errors (see :func:`if_transient_error`). For example:
+
+.. code-block:: python
+
+ @retry_async.AsyncRetry()
+ async def call_flaky_rpc():
+ return await client.flaky_rpc()
+
+ # Will retry flaky_rpc() if it raises transient API errors.
+ result = await call_flaky_rpc()
+
+You can pass a custom predicate to retry on different exceptions, such as
+waiting for an eventually consistent item to be available:
+
+.. code-block:: python
+
+ @retry_async.AsyncRetry(predicate=retry_async.if_exception_type(exceptions.NotFound))
+ async def check_if_exists():
+ return await client.does_thing_exist()
+
+ is_available = await check_if_exists()
+
+Some client library methods apply retry automatically. These methods can accept
+a ``retry`` parameter that allows you to configure the behavior:
+
+.. code-block:: python
+
+ my_retry = retry_async.AsyncRetry(deadline=60)
+ result = await client.some_method(retry=my_retry)
+
+"""
+
+import asyncio
+import datetime
+import functools
+import logging
+
+from google.api_core import datetime_helpers, exceptions
+from google.api_core.retry import (exponential_sleep_generator, # noqa: F401
+ if_exception_type, if_transient_error)
+
+_LOGGER = logging.getLogger(__name__)
+_DEFAULT_INITIAL_DELAY = 1.0 # seconds
+_DEFAULT_MAXIMUM_DELAY = 60.0 # seconds
+_DEFAULT_DELAY_MULTIPLIER = 2.0
+_DEFAULT_DEADLINE = 60.0 * 2.0 # seconds
+
+
+async def retry_target(target, predicate, sleep_generator, deadline, on_error=None):
+ """Call a function and retry if it fails.
+
+ This is the lowest-level retry helper. Generally, you'll use the
+ higher-level retry helper :class:`Retry`.
+
+ Args:
+ target(Callable): The function to call and retry. This must be a
+ nullary function - apply arguments with `functools.partial`.
+ predicate (Callable[Exception]): A callable used to determine if an
+ exception raised by the target should be considered retryable.
+ It should return True to retry or False otherwise.
+ sleep_generator (Iterable[float]): An infinite iterator that determines
+ how long to sleep between retries.
+ deadline (float): How long to keep retrying the target. The last sleep
+ period is shortened as necessary, so that the last retry runs at
+ ``deadline`` (and not considerably beyond it).
+ on_error (Callable[Exception]): A function to call while processing a
+ retryable exception. Any error raised by this function will *not*
+ be caught.
+
+ Returns:
+ Any: the return value of the target function.
+
+ Raises:
+ google.api_core.RetryError: If the deadline is exceeded while retrying.
+ ValueError: If the sleep generator stops yielding values.
+ Exception: If the target raises a method that isn't retryable.
+ """
+ deadline_dt = (datetime_helpers.utcnow() + datetime.timedelta(seconds=deadline)) if deadline else None
+
+ last_exc = None
+
+ for sleep in sleep_generator:
+ try:
+ if not deadline_dt:
+ return await target()
+ else:
+ return await asyncio.wait_for(
+ target(),
+ timeout=(deadline_dt - datetime_helpers.utcnow()).total_seconds()
+ )
+ # pylint: disable=broad-except
+ # This function explicitly must deal with broad exceptions.
+ except Exception as exc:
+ if not predicate(exc) and not isinstance(exc, asyncio.TimeoutError):
+ raise
+ last_exc = exc
+ if on_error is not None:
+ on_error(exc)
+
+ now = datetime_helpers.utcnow()
+
+ if deadline_dt:
+ if deadline_dt <= now:
+ # Chains the raising RetryError with the root cause error,
+ # which helps observability and debugability.
+ raise exceptions.RetryError(
+ "Deadline of {:.1f}s exceeded while calling {}".format(
+ deadline, target
+ ),
+ last_exc,
+ ) from last_exc
+ else:
+ time_to_deadline = (deadline_dt - now).total_seconds()
+ sleep = min(time_to_deadline, sleep)
+
+ _LOGGER.debug(
+ "Retrying due to {}, sleeping {:.1f}s ...".format(last_exc, sleep)
+ )
+ await asyncio.sleep(sleep)
+
+ raise ValueError("Sleep generator stopped yielding sleep values.")
+
+
+class AsyncRetry:
+ """Exponential retry decorator for async functions.
+
+ This class is a decorator used to add exponential back-off retry behavior
+ to an RPC call.
+
+ Although the default behavior is to retry transient API errors, a
+ different predicate can be provided to retry other exceptions.
+
+ Args:
+ predicate (Callable[Exception]): A callable that should return ``True``
+ if the given exception is retryable.
+ initial (float): The minimum a,out of time to delay in seconds. This
+ must be greater than 0.
+ maximum (float): The maximum amout of time to delay in seconds.
+ multiplier (float): The multiplier applied to the delay.
+ deadline (float): How long to keep retrying in seconds. The last sleep
+ period is shortened as necessary, so that the last retry runs at
+ ``deadline`` (and not considerably beyond it).
+ on_error (Callable[Exception]): A function to call while processing
+ a retryable exception. Any error raised by this function will
+ *not* be caught.
+ """
+
+ def __init__(
+ self,
+ predicate=if_transient_error,
+ initial=_DEFAULT_INITIAL_DELAY,
+ maximum=_DEFAULT_MAXIMUM_DELAY,
+ multiplier=_DEFAULT_DELAY_MULTIPLIER,
+ deadline=_DEFAULT_DEADLINE,
+ on_error=None,
+ ):
+ self._predicate = predicate
+ self._initial = initial
+ self._multiplier = multiplier
+ self._maximum = maximum
+ self._deadline = deadline
+ self._on_error = on_error
+
+ def __call__(self, func, on_error=None):
+ """Wrap a callable with retry behavior.
+
+ Args:
+ func (Callable): The callable to add retry behavior to.
+ on_error (Callable[Exception]): A function to call while processing
+ a retryable exception. Any error raised by this function will
+ *not* be caught.
+
+ Returns:
+ Callable: A callable that will invoke ``func`` with retry
+ behavior.
+ """
+ if self._on_error is not None:
+ on_error = self._on_error
+
+ @functools.wraps(func)
+ async def retry_wrapped_func(*args, **kwargs):
+ """A wrapper that calls target function with retry."""
+ target = functools.partial(func, *args, **kwargs)
+ sleep_generator = exponential_sleep_generator(
+ self._initial, self._maximum, multiplier=self._multiplier
+ )
+ return await retry_target(
+ target,
+ self._predicate,
+ sleep_generator,
+ self._deadline,
+ on_error=on_error,
+ )
+
+ return retry_wrapped_func
+
+ def _replace(self,
+ predicate=None,
+ initial=None,
+ maximum=None,
+ multiplier=None,
+ deadline=None,
+ on_error=None):
+ return AsyncRetry(
+ predicate=predicate or self._predicate,
+ initial=initial or self._initial,
+ maximum=maximum or self._maximum,
+ multiplier=multiplier or self._multiplier,
+ deadline=deadline or self._deadline,
+ on_error=on_error or self._on_error,
+ )
+
+ def with_deadline(self, deadline):
+ """Return a copy of this retry with the given deadline.
+
+ Args:
+ deadline (float): How long to keep retrying.
+
+ Returns:
+ AsyncRetry: A new retry instance with the given deadline.
+ """
+ return self._replace(deadline=deadline)
+
+ def with_predicate(self, predicate):
+ """Return a copy of this retry with the given predicate.
+
+ Args:
+ predicate (Callable[Exception]): A callable that should return
+ ``True`` if the given exception is retryable.
+
+ Returns:
+ AsyncRetry: A new retry instance with the given predicate.
+ """
+ return self._replace(predicate=predicate)
+
+ def with_delay(self, initial=None, maximum=None, multiplier=None):
+ """Return a copy of this retry with the given delay options.
+
+ Args:
+ initial (float): The minimum amout of time to delay. This must
+ be greater than 0.
+ maximum (float): The maximum amout of time to delay.
+ multiplier (float): The multiplier applied to the delay.
+
+ Returns:
+ AsyncRetry: A new retry instance with the given predicate.
+ """
+ return self._replace(initial=initial, maximum=maximum, multiplier=multiplier)
+
+ def __str__(self):
+ return (
+ "<AsyncRetry predicate={}, initial={:.1f}, maximum={:.1f}, "
+ "multiplier={:.1f}, deadline={:.1f}, on_error={}>".format(
+ self._predicate,
+ self._initial,
+ self._maximum,
+ self._multiplier,
+ self._deadline,
+ self._on_error,
+ )
+ )
diff --git a/noxfile.py b/noxfile.py
index dfb1257..989bb9b 100644
--- a/noxfile.py
+++ b/noxfile.py
@@ -15,10 +15,23 @@
from __future__ import absolute_import
import os
import shutil
+import sys
# https://github.com/google/importlab/issues/25
import nox # pytype: disable=import-error
+_MINIMAL_ASYNCIO_SUPPORT_PYTHON_VERSION = [3, 6]
+
+
+def _greater_or_equal_than_36(version_string):
+ tokens = version_string.split('.')
+ for i, token in enumerate(tokens):
+ try:
+ tokens[i] = int(token)
+ except ValueError:
+ pass
+ return tokens >= [3, 6]
+
def default(session):
"""Default unit test session.
@@ -32,8 +45,9 @@
session.install("mock", "pytest", "pytest-cov", "grpcio >= 1.0.2")
session.install("-e", ".")
- # Run py.test against the unit tests.
- session.run(
+ pytest_args = [
+ "python",
+ "-m",
"py.test",
"--quiet",
"--cov=google.api_core",
@@ -43,8 +57,19 @@
"--cov-report=",
"--cov-fail-under=0",
os.path.join("tests", "unit"),
- *session.posargs
- )
+ ]
+ pytest_args.extend(session.posargs)
+
+ # Inject AsyncIO content, if version >= 3.6.
+ if _greater_or_equal_than_36(session.python):
+ session.install("asyncmock", "pytest-asyncio")
+
+ pytest_args.append("--cov=tests.asyncio")
+ pytest_args.append(os.path.join("tests", "asyncio"))
+ session.run(*pytest_args)
+ else:
+ # Run py.test against the unit tests.
+ session.run(*pytest_args)
@nox.session(python=["2.7", "3.5", "3.6", "3.7", "3.8"])
diff --git a/tests/asyncio/__init__.py b/tests/asyncio/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/asyncio/__init__.py
diff --git a/tests/asyncio/gapic/test_config_async.py b/tests/asyncio/gapic/test_config_async.py
new file mode 100644
index 0000000..1f6ea9e
--- /dev/null
+++ b/tests/asyncio/gapic/test_config_async.py
@@ -0,0 +1,87 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from google.api_core import exceptions
+from google.api_core.gapic_v1 import config_async
+
+
+INTERFACE_CONFIG = {
+ "retry_codes": {
+ "idempotent": ["DEADLINE_EXCEEDED", "UNAVAILABLE"],
+ "other": ["FAILED_PRECONDITION"],
+ "non_idempotent": [],
+ },
+ "retry_params": {
+ "default": {
+ "initial_retry_delay_millis": 1000,
+ "retry_delay_multiplier": 2.5,
+ "max_retry_delay_millis": 120000,
+ "initial_rpc_timeout_millis": 120000,
+ "rpc_timeout_multiplier": 1.0,
+ "max_rpc_timeout_millis": 120000,
+ "total_timeout_millis": 600000,
+ },
+ "other": {
+ "initial_retry_delay_millis": 1000,
+ "retry_delay_multiplier": 1,
+ "max_retry_delay_millis": 1000,
+ "initial_rpc_timeout_millis": 1000,
+ "rpc_timeout_multiplier": 1,
+ "max_rpc_timeout_millis": 1000,
+ "total_timeout_millis": 1000,
+ },
+ },
+ "methods": {
+ "AnnotateVideo": {
+ "timeout_millis": 60000,
+ "retry_codes_name": "idempotent",
+ "retry_params_name": "default",
+ },
+ "Other": {
+ "timeout_millis": 60000,
+ "retry_codes_name": "other",
+ "retry_params_name": "other",
+ },
+ "Plain": {"timeout_millis": 30000},
+ },
+}
+
+
+def test_create_method_configs():
+ method_configs = config_async.parse_method_configs(INTERFACE_CONFIG)
+
+ retry, timeout = method_configs["AnnotateVideo"]
+ assert retry._predicate(exceptions.DeadlineExceeded(None))
+ assert retry._predicate(exceptions.ServiceUnavailable(None))
+ assert retry._initial == 1.0
+ assert retry._multiplier == 2.5
+ assert retry._maximum == 120.0
+ assert retry._deadline == 600.0
+ assert timeout._initial == 120.0
+ assert timeout._multiplier == 1.0
+ assert timeout._maximum == 120.0
+
+ retry, timeout = method_configs["Other"]
+ assert retry._predicate(exceptions.FailedPrecondition(None))
+ assert retry._initial == 1.0
+ assert retry._multiplier == 1.0
+ assert retry._maximum == 1.0
+ assert retry._deadline == 1.0
+ assert timeout._initial == 1.0
+ assert timeout._multiplier == 1.0
+ assert timeout._maximum == 1.0
+
+ retry, timeout = method_configs["Plain"]
+ assert retry is None
+ assert timeout._timeout == 30.0
diff --git a/tests/asyncio/test_retry_async.py b/tests/asyncio/test_retry_async.py
new file mode 100644
index 0000000..8f86366
--- /dev/null
+++ b/tests/asyncio/test_retry_async.py
@@ -0,0 +1,397 @@
+# Copyright 2020 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import re
+
+import mock
+import pytest
+
+from google.api_core import exceptions
+from google.api_core import retry_async
+
+
[email protected]("asyncio.sleep", autospec=True)
[email protected](
+ "google.api_core.datetime_helpers.utcnow",
+ return_value=datetime.datetime.min,
+ autospec=True,
+)
[email protected]
+async def test_retry_target_success(utcnow, sleep):
+ predicate = retry_async.if_exception_type(ValueError)
+ call_count = [0]
+
+ async def target():
+ call_count[0] += 1
+ if call_count[0] < 3:
+ raise ValueError()
+ return 42
+
+ result = await retry_async.retry_target(target, predicate, range(10), None)
+
+ assert result == 42
+ assert call_count[0] == 3
+ sleep.assert_has_calls([mock.call(0), mock.call(1)])
+
+
[email protected]("asyncio.sleep", autospec=True)
[email protected](
+ "google.api_core.datetime_helpers.utcnow",
+ return_value=datetime.datetime.min,
+ autospec=True,
+)
[email protected]
+async def test_retry_target_w_on_error(utcnow, sleep):
+ predicate = retry_async.if_exception_type(ValueError)
+ call_count = {"target": 0}
+ to_raise = ValueError()
+
+ async def target():
+ call_count["target"] += 1
+ if call_count["target"] < 3:
+ raise to_raise
+ return 42
+
+ on_error = mock.Mock()
+
+ result = await retry_async.retry_target(target, predicate, range(10), None, on_error=on_error)
+
+ assert result == 42
+ assert call_count["target"] == 3
+
+ on_error.assert_has_calls([mock.call(to_raise), mock.call(to_raise)])
+ sleep.assert_has_calls([mock.call(0), mock.call(1)])
+
+
[email protected]("asyncio.sleep", autospec=True)
[email protected](
+ "google.api_core.datetime_helpers.utcnow",
+ return_value=datetime.datetime.min,
+ autospec=True,
+)
[email protected]
+async def test_retry_target_non_retryable_error(utcnow, sleep):
+ predicate = retry_async.if_exception_type(ValueError)
+ exception = TypeError()
+ target = mock.Mock(side_effect=exception)
+
+ with pytest.raises(TypeError) as exc_info:
+ await retry_async.retry_target(target, predicate, range(10), None)
+
+ assert exc_info.value == exception
+ sleep.assert_not_called()
+
+
[email protected]("asyncio.sleep", autospec=True)
[email protected]("google.api_core.datetime_helpers.utcnow", autospec=True)
[email protected]
+async def test_retry_target_deadline_exceeded(utcnow, sleep):
+ predicate = retry_async.if_exception_type(ValueError)
+ exception = ValueError("meep")
+ target = mock.Mock(side_effect=exception)
+ # Setup the timeline so that the first call takes 5 seconds but the second
+ # call takes 6, which puts the retry over the deadline.
+ utcnow.side_effect = [
+ # The first call to utcnow establishes the start of the timeline.
+ datetime.datetime.min,
+ datetime.datetime.min + datetime.timedelta(seconds=5),
+ datetime.datetime.min + datetime.timedelta(seconds=11),
+ ]
+
+ with pytest.raises(exceptions.RetryError) as exc_info:
+ await retry_async.retry_target(target, predicate, range(10), deadline=10)
+
+ assert exc_info.value.cause == exception
+ assert exc_info.match("Deadline of 10.0s exceeded")
+ assert exc_info.match("last exception: meep")
+ assert target.call_count == 2
+
+
[email protected]
+async def test_retry_target_bad_sleep_generator():
+ with pytest.raises(ValueError, match="Sleep generator"):
+ await retry_async.retry_target(mock.sentinel.target, mock.sentinel.predicate, [], None)
+
+
+class TestAsyncRetry:
+
+ def test_constructor_defaults(self):
+ retry_ = retry_async.AsyncRetry()
+ assert retry_._predicate == retry_async.if_transient_error
+ assert retry_._initial == 1
+ assert retry_._maximum == 60
+ assert retry_._multiplier == 2
+ assert retry_._deadline == 120
+ assert retry_._on_error is None
+
+ def test_constructor_options(self):
+ _some_function = mock.Mock()
+
+ retry_ = retry_async.AsyncRetry(
+ predicate=mock.sentinel.predicate,
+ initial=1,
+ maximum=2,
+ multiplier=3,
+ deadline=4,
+ on_error=_some_function,
+ )
+ assert retry_._predicate == mock.sentinel.predicate
+ assert retry_._initial == 1
+ assert retry_._maximum == 2
+ assert retry_._multiplier == 3
+ assert retry_._deadline == 4
+ assert retry_._on_error is _some_function
+
+ def test_with_deadline(self):
+ retry_ = retry_async.AsyncRetry(
+ predicate=mock.sentinel.predicate,
+ initial=1,
+ maximum=2,
+ multiplier=3,
+ deadline=4,
+ on_error=mock.sentinel.on_error,
+ )
+ new_retry = retry_.with_deadline(42)
+ assert retry_ is not new_retry
+ assert new_retry._deadline == 42
+
+ # the rest of the attributes should remain the same
+ assert new_retry._predicate is retry_._predicate
+ assert new_retry._initial == retry_._initial
+ assert new_retry._maximum == retry_._maximum
+ assert new_retry._multiplier == retry_._multiplier
+ assert new_retry._on_error is retry_._on_error
+
+ def test_with_predicate(self):
+ retry_ = retry_async.AsyncRetry(
+ predicate=mock.sentinel.predicate,
+ initial=1,
+ maximum=2,
+ multiplier=3,
+ deadline=4,
+ on_error=mock.sentinel.on_error,
+ )
+ new_retry = retry_.with_predicate(mock.sentinel.predicate)
+ assert retry_ is not new_retry
+ assert new_retry._predicate == mock.sentinel.predicate
+
+ # the rest of the attributes should remain the same
+ assert new_retry._deadline == retry_._deadline
+ assert new_retry._initial == retry_._initial
+ assert new_retry._maximum == retry_._maximum
+ assert new_retry._multiplier == retry_._multiplier
+ assert new_retry._on_error is retry_._on_error
+
+ def test_with_delay_noop(self):
+ retry_ = retry_async.AsyncRetry(
+ predicate=mock.sentinel.predicate,
+ initial=1,
+ maximum=2,
+ multiplier=3,
+ deadline=4,
+ on_error=mock.sentinel.on_error,
+ )
+ new_retry = retry_.with_delay()
+ assert retry_ is not new_retry
+ assert new_retry._initial == retry_._initial
+ assert new_retry._maximum == retry_._maximum
+ assert new_retry._multiplier == retry_._multiplier
+
+ def test_with_delay(self):
+ retry_ = retry_async.AsyncRetry(
+ predicate=mock.sentinel.predicate,
+ initial=1,
+ maximum=2,
+ multiplier=3,
+ deadline=4,
+ on_error=mock.sentinel.on_error,
+ )
+ new_retry = retry_.with_delay(initial=1, maximum=2, multiplier=3)
+ assert retry_ is not new_retry
+ assert new_retry._initial == 1
+ assert new_retry._maximum == 2
+ assert new_retry._multiplier == 3
+
+ # the rest of the attributes should remain the same
+ assert new_retry._deadline == retry_._deadline
+ assert new_retry._predicate is retry_._predicate
+ assert new_retry._on_error is retry_._on_error
+
+ def test___str__(self):
+ def if_exception_type(exc):
+ return bool(exc) # pragma: NO COVER
+
+ # Explicitly set all attributes as changed Retry defaults should not
+ # cause this test to start failing.
+ retry_ = retry_async.AsyncRetry(
+ predicate=if_exception_type,
+ initial=1.0,
+ maximum=60.0,
+ multiplier=2.0,
+ deadline=120.0,
+ on_error=None,
+ )
+ assert re.match(
+ (
+ r"<AsyncRetry predicate=<function.*?if_exception_type.*?>, "
+ r"initial=1.0, maximum=60.0, multiplier=2.0, deadline=120.0, "
+ r"on_error=None>"
+ ),
+ str(retry_),
+ )
+
+ @mock.patch("asyncio.sleep", autospec=True)
+ @pytest.mark.asyncio
+ async def test___call___and_execute_success(self, sleep):
+ retry_ = retry_async.AsyncRetry()
+ target = mock.AsyncMock(spec=["__call__"], return_value=42)
+ # __name__ is needed by functools.partial.
+ target.__name__ = "target"
+
+ decorated = retry_(target)
+ target.assert_not_called()
+
+ result = await decorated("meep")
+
+ assert result == 42
+ target.assert_called_once_with("meep")
+ sleep.assert_not_called()
+
+ # Make uniform return half of its maximum, which is the calculated sleep time.
+ @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n / 2.0)
+ @mock.patch("asyncio.sleep", autospec=True)
+ @pytest.mark.asyncio
+ async def test___call___and_execute_retry(self, sleep, uniform):
+
+ on_error = mock.Mock(spec=["__call__"], side_effect=[None])
+ retry_ = retry_async.AsyncRetry(predicate=retry_async.if_exception_type(ValueError))
+
+ target = mock.AsyncMock(spec=["__call__"], side_effect=[ValueError(), 42])
+ # __name__ is needed by functools.partial.
+ target.__name__ = "target"
+
+ decorated = retry_(target, on_error=on_error)
+ target.assert_not_called()
+
+ result = await decorated("meep")
+
+ assert result == 42
+ assert target.call_count == 2
+ target.assert_has_calls([mock.call("meep"), mock.call("meep")])
+ sleep.assert_called_once_with(retry_._initial)
+ assert on_error.call_count == 1
+
+ # Make uniform return half of its maximum, which is the calculated sleep time.
+ @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n / 2.0)
+ @mock.patch("asyncio.sleep", autospec=True)
+ @pytest.mark.asyncio
+ async def test___call___and_execute_retry_hitting_deadline(self, sleep, uniform):
+
+ on_error = mock.Mock(spec=["__call__"], side_effect=[None] * 10)
+ retry_ = retry_async.AsyncRetry(
+ predicate=retry_async.if_exception_type(ValueError),
+ initial=1.0,
+ maximum=1024.0,
+ multiplier=2.0,
+ deadline=9.9,
+ )
+
+ utcnow = datetime.datetime.utcnow()
+ utcnow_patcher = mock.patch(
+ "google.api_core.datetime_helpers.utcnow", return_value=utcnow
+ )
+
+ target = mock.AsyncMock(spec=["__call__"], side_effect=[ValueError()] * 10)
+ # __name__ is needed by functools.partial.
+ target.__name__ = "target"
+
+ decorated = retry_(target, on_error=on_error)
+ target.assert_not_called()
+
+ with utcnow_patcher as patched_utcnow:
+ # Make sure that calls to fake asyncio.sleep() also advance the mocked
+ # time clock.
+ def increase_time(sleep_delay):
+ patched_utcnow.return_value += datetime.timedelta(seconds=sleep_delay)
+ sleep.side_effect = increase_time
+
+ with pytest.raises(exceptions.RetryError):
+ await decorated("meep")
+
+ assert target.call_count == 5
+ target.assert_has_calls([mock.call("meep")] * 5)
+ assert on_error.call_count == 5
+
+ # check the delays
+ assert sleep.call_count == 4 # once between each successive target calls
+ last_wait = sleep.call_args.args[0]
+ total_wait = sum(call_args.args[0] for call_args in sleep.call_args_list)
+
+ assert last_wait == 2.9 # and not 8.0, because the last delay was shortened
+ assert total_wait == 9.9 # the same as the deadline
+
+ @mock.patch("asyncio.sleep", autospec=True)
+ @pytest.mark.asyncio
+ async def test___init___without_retry_executed(self, sleep):
+ _some_function = mock.Mock()
+
+ retry_ = retry_async.AsyncRetry(
+ predicate=retry_async.if_exception_type(ValueError), on_error=_some_function
+ )
+ # check the proper creation of the class
+ assert retry_._on_error is _some_function
+
+ target = mock.AsyncMock(spec=["__call__"], side_effect=[42])
+ # __name__ is needed by functools.partial.
+ target.__name__ = "target"
+
+ wrapped = retry_(target)
+
+ result = await wrapped("meep")
+
+ assert result == 42
+ target.assert_called_once_with("meep")
+ sleep.assert_not_called()
+ _some_function.assert_not_called()
+
+ # Make uniform return half of its maximum, which is the calculated sleep time.
+ @mock.patch("random.uniform", autospec=True, side_effect=lambda m, n: n / 2.0)
+ @mock.patch("asyncio.sleep", autospec=True)
+ @pytest.mark.asyncio
+ async def test___init___when_retry_is_executed(self, sleep, uniform):
+ _some_function = mock.Mock()
+
+ retry_ = retry_async.AsyncRetry(
+ predicate=retry_async.if_exception_type(ValueError), on_error=_some_function
+ )
+ # check the proper creation of the class
+ assert retry_._on_error is _some_function
+
+ target = mock.AsyncMock(
+ spec=["__call__"], side_effect=[ValueError(), ValueError(), 42]
+ )
+ # __name__ is needed by functools.partial.
+ target.__name__ = "target"
+
+ wrapped = retry_(target)
+ target.assert_not_called()
+
+ result = await wrapped("meep")
+
+ assert result == 42
+ assert target.call_count == 3
+ assert _some_function.call_count == 2
+ target.assert_has_calls([mock.call("meep"), mock.call("meep")])
+ sleep.assert_any_call(retry_._initial)