chore: blacken (#375)
diff --git a/tests/compute_engine/test__metadata.py b/tests/compute_engine/test__metadata.py
index bf48882..bd06b74 100644
--- a/tests/compute_engine/test__metadata.py
+++ b/tests/compute_engine/test__metadata.py
@@ -27,7 +27,7 @@
from google.auth import transport
from google.auth.compute_engine import _metadata
-PATH = 'instance/service-accounts/default'
+PATH = "instance/service-accounts/default"
def make_request(data, status=http_client.OK, headers=None):
@@ -43,35 +43,35 @@
def test_ping_success():
- request = make_request('', headers=_metadata._METADATA_HEADERS)
+ request = make_request("", headers=_metadata._METADATA_HEADERS)
assert _metadata.ping(request)
request.assert_called_once_with(
- method='GET',
+ method="GET",
url=_metadata._METADATA_IP_ROOT,
headers=_metadata._METADATA_HEADERS,
- timeout=_metadata._METADATA_DEFAULT_TIMEOUT)
+ timeout=_metadata._METADATA_DEFAULT_TIMEOUT,
+ )
def test_ping_failure_bad_flavor():
- request = make_request(
- '', headers={_metadata._METADATA_FLAVOR_HEADER: 'meep'})
+ request = make_request("", headers={_metadata._METADATA_FLAVOR_HEADER: "meep"})
assert not _metadata.ping(request)
def test_ping_failure_connection_failed():
- request = make_request('')
+ request = make_request("")
request.side_effect = exceptions.TransportError()
assert not _metadata.ping(request)
def test_ping_success_custom_root():
- request = make_request('', headers=_metadata._METADATA_HEADERS)
+ request = make_request("", headers=_metadata._METADATA_HEADERS)
- fake_ip = '1.2.3.4'
+ fake_ip = "1.2.3.4"
os.environ[environment_vars.GCE_METADATA_IP] = fake_ip
reload_module(_metadata)
@@ -82,46 +82,47 @@
reload_module(_metadata)
request.assert_called_once_with(
- method='GET',
- url='http://' + fake_ip,
+ method="GET",
+ url="http://" + fake_ip,
headers=_metadata._METADATA_HEADERS,
- timeout=_metadata._METADATA_DEFAULT_TIMEOUT)
+ timeout=_metadata._METADATA_DEFAULT_TIMEOUT,
+ )
def test_get_success_json():
- key, value = 'foo', 'bar'
+ key, value = "foo", "bar"
data = json.dumps({key: value})
- request = make_request(
- data, headers={'content-type': 'application/json'})
+ request = make_request(data, headers={"content-type": "application/json"})
result = _metadata.get(request, PATH)
request.assert_called_once_with(
- method='GET',
+ method="GET",
url=_metadata._METADATA_ROOT + PATH,
- headers=_metadata._METADATA_HEADERS)
+ headers=_metadata._METADATA_HEADERS,
+ )
assert result[key] == value
def test_get_success_text():
- data = 'foobar'
- request = make_request(data, headers={'content-type': 'text/plain'})
+ data = "foobar"
+ request = make_request(data, headers={"content-type": "text/plain"})
result = _metadata.get(request, PATH)
request.assert_called_once_with(
- method='GET',
+ method="GET",
url=_metadata._METADATA_ROOT + PATH,
- headers=_metadata._METADATA_HEADERS)
+ headers=_metadata._METADATA_HEADERS,
+ )
assert result == data
def test_get_success_custom_root():
- request = make_request(
- '{}', headers={'content-type': 'application/json'})
+ request = make_request("{}", headers={"content-type": "application/json"})
- fake_root = 'another.metadata.service'
+ fake_root = "another.metadata.service"
os.environ[environment_vars.GCE_METADATA_ROOT] = fake_root
reload_module(_metadata)
@@ -132,83 +133,87 @@
reload_module(_metadata)
request.assert_called_once_with(
- method='GET',
- url='http://{}/computeMetadata/v1/{}'.format(fake_root, PATH),
- headers=_metadata._METADATA_HEADERS)
+ method="GET",
+ url="http://{}/computeMetadata/v1/{}".format(fake_root, PATH),
+ headers=_metadata._METADATA_HEADERS,
+ )
def test_get_failure():
- request = make_request(
- 'Metadata error', status=http_client.NOT_FOUND)
+ request = make_request("Metadata error", status=http_client.NOT_FOUND)
with pytest.raises(exceptions.TransportError) as excinfo:
_metadata.get(request, PATH)
- assert excinfo.match(r'Metadata error')
+ assert excinfo.match(r"Metadata error")
request.assert_called_once_with(
- method='GET',
+ method="GET",
url=_metadata._METADATA_ROOT + PATH,
- headers=_metadata._METADATA_HEADERS)
+ headers=_metadata._METADATA_HEADERS,
+ )
def test_get_failure_bad_json():
- request = make_request(
- '{', headers={'content-type': 'application/json'})
+ request = make_request("{", headers={"content-type": "application/json"})
with pytest.raises(exceptions.TransportError) as excinfo:
_metadata.get(request, PATH)
- assert excinfo.match(r'invalid JSON')
+ assert excinfo.match(r"invalid JSON")
request.assert_called_once_with(
- method='GET',
+ method="GET",
url=_metadata._METADATA_ROOT + PATH,
- headers=_metadata._METADATA_HEADERS)
+ headers=_metadata._METADATA_HEADERS,
+ )
def test_get_project_id():
- project = 'example-project'
- request = make_request(
- project, headers={'content-type': 'text/plain'})
+ project = "example-project"
+ request = make_request(project, headers={"content-type": "text/plain"})
project_id = _metadata.get_project_id(request)
request.assert_called_once_with(
- method='GET',
- url=_metadata._METADATA_ROOT + 'project/project-id',
- headers=_metadata._METADATA_HEADERS)
+ method="GET",
+ url=_metadata._METADATA_ROOT + "project/project-id",
+ headers=_metadata._METADATA_HEADERS,
+ )
assert project_id == project
[email protected]('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
[email protected]("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_get_service_account_token(utcnow):
ttl = 500
request = make_request(
- json.dumps({'access_token': 'token', 'expires_in': ttl}),
- headers={'content-type': 'application/json'})
+ json.dumps({"access_token": "token", "expires_in": ttl}),
+ headers={"content-type": "application/json"},
+ )
token, expiry = _metadata.get_service_account_token(request)
request.assert_called_once_with(
- method='GET',
- url=_metadata._METADATA_ROOT + PATH + '/token',
- headers=_metadata._METADATA_HEADERS)
- assert token == 'token'
+ method="GET",
+ url=_metadata._METADATA_ROOT + PATH + "/token",
+ headers=_metadata._METADATA_HEADERS,
+ )
+ assert token == "token"
assert expiry == utcnow() + datetime.timedelta(seconds=ttl)
def test_get_service_account_info():
- key, value = 'foo', 'bar'
+ key, value = "foo", "bar"
request = make_request(
- json.dumps({key: value}),
- headers={'content-type': 'application/json'})
+ json.dumps({key: value}), headers={"content-type": "application/json"}
+ )
info = _metadata.get_service_account_info(request)
request.assert_called_once_with(
- method='GET',
- url=_metadata._METADATA_ROOT + PATH + '/?recursive=true',
- headers=_metadata._METADATA_HEADERS)
+ method="GET",
+ url=_metadata._METADATA_ROOT + PATH + "/?recursive=true",
+ headers=_metadata._METADATA_HEADERS,
+ )
assert info[key] == value
diff --git a/tests/compute_engine/test_credentials.py b/tests/compute_engine/test_credentials.py
index ee415db..ec9d13b 100644
--- a/tests/compute_engine/test_credentials.py
+++ b/tests/compute_engine/test_credentials.py
@@ -38,68 +38,72 @@
# Scopes aren't needed
assert not self.credentials.requires_scopes
# Service account email hasn't been populated
- assert self.credentials.service_account_email == 'default'
+ assert self.credentials.service_account_email == "default"
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.min + _helpers.CLOCK_SKEW)
- @mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_success(self, get, utcnow):
- get.side_effect = [{
- # First request is for sevice account info.
- 'email': '[email protected]',
- 'scopes': ['one', 'two']
- }, {
- # Second request is for the token.
- 'access_token': 'token',
- 'expires_in': 500
- }]
+ get.side_effect = [
+ {
+ # First request is for sevice account info.
+ "email": "[email protected]",
+ "scopes": ["one", "two"],
+ },
+ {
+ # Second request is for the token.
+ "access_token": "token",
+ "expires_in": 500,
+ },
+ ]
# Refresh credentials
self.credentials.refresh(None)
# Check that the credentials have the token and proper expiration
- assert self.credentials.token == 'token'
- assert self.credentials.expiry == (
- utcnow() + datetime.timedelta(seconds=500))
+ assert self.credentials.token == "token"
+ assert self.credentials.expiry == (utcnow() + datetime.timedelta(seconds=500))
# Check the credential info
- assert (self.credentials.service_account_email ==
- '[email protected]')
- assert self.credentials._scopes == ['one', 'two']
+ assert self.credentials.service_account_email == "[email protected]"
+ assert self.credentials._scopes == ["one", "two"]
# Check that the credentials are valid (have a token and are not
# expired)
assert self.credentials.valid
- @mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_error(self, get):
- get.side_effect = exceptions.TransportError('http error')
+ get.side_effect = exceptions.TransportError("http error")
with pytest.raises(exceptions.RefreshError) as excinfo:
self.credentials.refresh(None)
- assert excinfo.match(r'http error')
+ assert excinfo.match(r"http error")
- @mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_before_request_refreshes(self, get):
- get.side_effect = [{
- # First request is for sevice account info.
- 'email': '[email protected]',
- 'scopes': 'one two'
- }, {
- # Second request is for the token.
- 'access_token': 'token',
- 'expires_in': 500
- }]
+ get.side_effect = [
+ {
+ # First request is for sevice account info.
+ "email": "[email protected]",
+ "scopes": "one two",
+ },
+ {
+ # Second request is for the token.
+ "access_token": "token",
+ "expires_in": 500,
+ },
+ ]
# Credentials should start as invalid
assert not self.credentials.valid
# before_request should cause a refresh
request = mock.create_autospec(transport.Request, instance=True)
- self.credentials.before_request(
- request, 'GET', 'http://example.com?a=1#3', {})
+ self.credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
# The refresh endpoint should've been called.
assert get.called
@@ -111,201 +115,207 @@
class TestIDTokenCredentials(object):
credentials = None
- @mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_default_state(self, get):
- get.side_effect = [{
- 'email': '[email protected]',
- 'scope': ['one', 'two'],
- }]
+ get.side_effect = [
+ {"email": "[email protected]", "scope": ["one", "two"]}
+ ]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://example.com")
+ request=request, target_audience="https://example.com"
+ )
assert not self.credentials.valid
# Expiration hasn't been set yet
assert not self.credentials.expired
# Service account email hasn't been populated
- assert (self.credentials.service_account_email
- == '[email protected]')
+ assert self.credentials.service_account_email == "[email protected]"
# Signer is initialized
assert self.credentials.signer
- assert self.credentials.signer_email == '[email protected]'
+ assert self.credentials.signer_email == "[email protected]"
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.utcfromtimestamp(0))
- @mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
- @mock.patch('google.auth.iam.Signer.sign', autospec=True)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.utcfromtimestamp(0),
+ )
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_make_authorization_grant_assertion(self, sign, get, utcnow):
- get.side_effect = [{
- 'email': '[email protected]',
- 'scopes': ['one', 'two']
- }]
- sign.side_effect = [b'signature']
+ get.side_effect = [
+ {"email": "[email protected]", "scopes": ["one", "two"]}
+ ]
+ sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com")
+ request=request, target_audience="https://audience.com"
+ )
# Generate authorization grant:
token = self.credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, verify=False)
# The JWT token signature is 'signature' encoded in base 64:
- assert token.endswith(b'.c2lnbmF0dXJl')
+ assert token.endswith(b".c2lnbmF0dXJl")
# Check that the credentials have the token and proper expiration
assert payload == {
- 'aud': 'https://www.googleapis.com/oauth2/v4/token',
- 'exp': 3600,
- 'iat': 0,
- 'iss': '[email protected]',
- 'target_audience': 'https://audience.com'}
+ "aud": "https://www.googleapis.com/oauth2/v4/token",
+ "exp": 3600,
+ "iat": 0,
+ "iss": "[email protected]",
+ "target_audience": "https://audience.com",
+ }
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.utcfromtimestamp(0))
- @mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
- @mock.patch('google.auth.iam.Signer.sign', autospec=True)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.utcfromtimestamp(0),
+ )
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_with_service_account(self, sign, get, utcnow):
- sign.side_effect = [b'signature']
+ sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com",
- service_account_email="[email protected]")
+ request=request,
+ target_audience="https://audience.com",
+ service_account_email="[email protected]",
+ )
# Generate authorization grant:
token = self.credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, verify=False)
# The JWT token signature is 'signature' encoded in base 64:
- assert token.endswith(b'.c2lnbmF0dXJl')
+ assert token.endswith(b".c2lnbmF0dXJl")
# Check that the credentials have the token and proper expiration
assert payload == {
- 'aud': 'https://www.googleapis.com/oauth2/v4/token',
- 'exp': 3600,
- 'iat': 0,
- 'iss': '[email protected]',
- 'target_audience': 'https://audience.com'}
+ "aud": "https://www.googleapis.com/oauth2/v4/token",
+ "exp": 3600,
+ "iat": 0,
+ "iss": "[email protected]",
+ "target_audience": "https://audience.com",
+ }
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.utcfromtimestamp(0))
- @mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
- @mock.patch('google.auth.iam.Signer.sign', autospec=True)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.utcfromtimestamp(0),
+ )
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_additional_claims(self, sign, get, utcnow):
- get.side_effect = [{
- 'email': '[email protected]',
- 'scopes': ['one', 'two']
- }]
- sign.side_effect = [b'signature']
+ get.side_effect = [
+ {"email": "[email protected]", "scopes": ["one", "two"]}
+ ]
+ sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com",
- additional_claims={'foo': 'bar'})
+ request=request,
+ target_audience="https://audience.com",
+ additional_claims={"foo": "bar"},
+ )
# Generate authorization grant:
token = self.credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, verify=False)
# The JWT token signature is 'signature' encoded in base 64:
- assert token.endswith(b'.c2lnbmF0dXJl')
+ assert token.endswith(b".c2lnbmF0dXJl")
# Check that the credentials have the token and proper expiration
assert payload == {
- 'aud': 'https://www.googleapis.com/oauth2/v4/token',
- 'exp': 3600,
- 'iat': 0,
- 'iss': '[email protected]',
- 'target_audience': 'https://audience.com',
- 'foo': 'bar'}
+ "aud": "https://www.googleapis.com/oauth2/v4/token",
+ "exp": 3600,
+ "iat": 0,
+ "iss": "[email protected]",
+ "target_audience": "https://audience.com",
+ "foo": "bar",
+ }
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.utcfromtimestamp(0))
- @mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
- @mock.patch('google.auth.iam.Signer.sign', autospec=True)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.utcfromtimestamp(0),
+ )
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_with_target_audience(self, sign, get, utcnow):
- get.side_effect = [{
- 'email': '[email protected]',
- 'scopes': ['one', 'two']
- }]
- sign.side_effect = [b'signature']
+ get.side_effect = [
+ {"email": "[email protected]", "scopes": ["one", "two"]}
+ ]
+ sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com")
- self.credentials = (
- self.credentials.with_target_audience("https://actually.not"))
+ request=request, target_audience="https://audience.com"
+ )
+ self.credentials = self.credentials.with_target_audience("https://actually.not")
# Generate authorization grant:
token = self.credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, verify=False)
# The JWT token signature is 'signature' encoded in base 64:
- assert token.endswith(b'.c2lnbmF0dXJl')
+ assert token.endswith(b".c2lnbmF0dXJl")
# Check that the credentials have the token and proper expiration
assert payload == {
- 'aud': 'https://www.googleapis.com/oauth2/v4/token',
- 'exp': 3600,
- 'iat': 0,
- 'iss': '[email protected]',
- 'target_audience': 'https://actually.not'}
+ "aud": "https://www.googleapis.com/oauth2/v4/token",
+ "exp": 3600,
+ "iat": 0,
+ "iss": "[email protected]",
+ "target_audience": "https://actually.not",
+ }
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.utcfromtimestamp(0))
- @mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
- @mock.patch('google.auth.iam.Signer.sign', autospec=True)
- @mock.patch('google.oauth2._client.id_token_jwt_grant', autospec=True)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.utcfromtimestamp(0),
+ )
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch("google.auth.iam.Signer.sign", autospec=True)
+ @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
def test_refresh_success(self, id_token_jwt_grant, sign, get, utcnow):
- get.side_effect = [{
- 'email': '[email protected]',
- 'scopes': ['one', 'two']
- }]
- sign.side_effect = [b'signature']
- id_token_jwt_grant.side_effect = [(
- 'idtoken',
- datetime.datetime.utcfromtimestamp(3600),
- {},
- )]
+ get.side_effect = [
+ {"email": "[email protected]", "scopes": ["one", "two"]}
+ ]
+ sign.side_effect = [b"signature"]
+ id_token_jwt_grant.side_effect = [
+ ("idtoken", datetime.datetime.utcfromtimestamp(3600), {})
+ ]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com")
+ request=request, target_audience="https://audience.com"
+ )
# Refresh credentials
self.credentials.refresh(None)
# Check that the credentials have the token and proper expiration
- assert self.credentials.token == 'idtoken'
- assert self.credentials.expiry == (
- datetime.datetime.utcfromtimestamp(3600))
+ assert self.credentials.token == "idtoken"
+ assert self.credentials.expiry == (datetime.datetime.utcfromtimestamp(3600))
# Check the credential info
- assert (self.credentials.service_account_email ==
- '[email protected]')
+ assert self.credentials.service_account_email == "[email protected]"
# Check that the credentials are valid (have a token and are not
# expired)
assert self.credentials.valid
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.utcfromtimestamp(0))
- @mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
- @mock.patch('google.auth.iam.Signer.sign', autospec=True)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.utcfromtimestamp(0),
+ )
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_refresh_error(self, sign, get, utcnow):
- get.side_effect = [{
- 'email': '[email protected]',
- 'scopes': ['one', 'two'],
- }]
- sign.side_effect = [b'signature']
+ get.side_effect = [
+ {"email": "[email protected]", "scopes": ["one", "two"]}
+ ]
+ sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
response = mock.Mock()
@@ -314,43 +324,41 @@
request.side_effect = [response]
self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com")
+ request=request, target_audience="https://audience.com"
+ )
with pytest.raises(exceptions.RefreshError) as excinfo:
self.credentials.refresh(request)
- assert excinfo.match(r'http error')
+ assert excinfo.match(r"http error")
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.utcfromtimestamp(0))
- @mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
- @mock.patch('google.auth.iam.Signer.sign', autospec=True)
- @mock.patch('google.oauth2._client.id_token_jwt_grant', autospec=True)
- def test_before_request_refreshes(
- self, id_token_jwt_grant, sign, get, utcnow):
- get.side_effect = [{
- 'email': '[email protected]',
- 'scopes': 'one two'
- }]
- sign.side_effect = [b'signature']
- id_token_jwt_grant.side_effect = [(
- 'idtoken',
- datetime.datetime.utcfromtimestamp(3600),
- {},
- )]
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.utcfromtimestamp(0),
+ )
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch("google.auth.iam.Signer.sign", autospec=True)
+ @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
+ def test_before_request_refreshes(self, id_token_jwt_grant, sign, get, utcnow):
+ get.side_effect = [
+ {"email": "[email protected]", "scopes": "one two"}
+ ]
+ sign.side_effect = [b"signature"]
+ id_token_jwt_grant.side_effect = [
+ ("idtoken", datetime.datetime.utcfromtimestamp(3600), {})
+ ]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com")
+ request=request, target_audience="https://audience.com"
+ )
# Credentials should start as invalid
assert not self.credentials.valid
# before_request should cause a refresh
request = mock.create_autospec(transport.Request, instance=True)
- self.credentials.before_request(
- request, 'GET', 'http://example.com?a=1#3', {})
+ self.credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
# The refresh endpoint should've been called.
assert get.called
@@ -358,14 +366,13 @@
# Credentials should now be valid.
assert self.credentials.valid
- @mock.patch('google.auth.compute_engine._metadata.get', autospec=True)
- @mock.patch('google.auth.iam.Signer.sign', autospec=True)
+ @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+ @mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_sign_bytes(self, sign, get):
- get.side_effect = [{
- 'email': '[email protected]',
- 'scopes': ['one', 'two']
- }]
- sign.side_effect = [b'signature']
+ get.side_effect = [
+ {"email": "[email protected]", "scopes": ["one", "two"]}
+ ]
+ sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
response = mock.Mock()
@@ -374,10 +381,11 @@
request.side_effect = [response]
self.credentials = credentials.IDTokenCredentials(
- request=request, target_audience="https://audience.com")
+ request=request, target_audience="https://audience.com"
+ )
# Generate authorization grant:
signature = self.credentials.sign_bytes(b"some bytes")
# The JWT token signature is 'signature' encoded in base 64:
- assert signature == b'signature'
+ assert signature == b"signature"
diff --git a/tests/conftest.py b/tests/conftest.py
index c9e3f84..2ccc132 100644
--- a/tests/conftest.py
+++ b/tests/conftest.py
@@ -24,14 +24,14 @@
Additionally mocks any non-existing modules specified in the dotted path.
"""
+
def _mock_non_existent_module(path):
- parts = path.split('.')
+ parts = path.split(".")
partial = []
for part in parts:
partial.append(part)
- current_module = '.'.join(partial)
+ current_module = ".".join(partial)
if current_module not in sys.modules:
- monkeypatch.setitem(
- sys.modules, current_module, mock.MagicMock())
+ monkeypatch.setitem(sys.modules, current_module, mock.MagicMock())
return _mock_non_existent_module
diff --git a/tests/crypt/test__cryptography_rsa.py b/tests/crypt/test__cryptography_rsa.py
index a7ebb64..10f926b 100644
--- a/tests/crypt/test__cryptography_rsa.py
+++ b/tests/crypt/test__cryptography_rsa.py
@@ -23,21 +23,21 @@
from google.auth.crypt import base
-DATA_DIR = os.path.join(os.path.dirname(__file__), '..', 'data')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")
# To generate privatekey.pem, privatekey.pub, and public_cert.pem:
# $ openssl req -new -newkey rsa:1024 -x509 -nodes -out public_cert.pem \
# > -keyout privatekey.pem
# $ openssl rsa -in privatekey.pem -pubout -out privatekey.pub
-with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
PKCS1_KEY_BYTES = PRIVATE_KEY_BYTES
-with open(os.path.join(DATA_DIR, 'privatekey.pub'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "privatekey.pub"), "rb") as fh:
PUBLIC_KEY_BYTES = fh.read()
-with open(os.path.join(DATA_DIR, 'public_cert.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "public_cert.pem"), "rb") as fh:
PUBLIC_CERT_BYTES = fh.read()
# To generate pem_from_pkcs12.pem and privatekey.p12:
@@ -46,22 +46,22 @@
# $ openssl pkcs12 -in privatekey.p12 -nocerts -nodes \
# > -out pem_from_pkcs12.pem
-with open(os.path.join(DATA_DIR, 'pem_from_pkcs12.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "pem_from_pkcs12.pem"), "rb") as fh:
PKCS8_KEY_BYTES = fh.read()
-with open(os.path.join(DATA_DIR, 'privatekey.p12'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "privatekey.p12"), "rb") as fh:
PKCS12_KEY_BYTES = fh.read()
# The service account JSON file can be generated from the Google Cloud Console.
-SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
+SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
-with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
+with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh:
SERVICE_ACCOUNT_INFO = json.load(fh)
class TestRSAVerifier(object):
def test_verify_success(self):
- to_sign = b'foo'
+ to_sign = b"foo"
signer = _cryptography_rsa.RSASigner.from_string(PRIVATE_KEY_BYTES)
actual_signature = signer.sign(to_sign)
@@ -69,7 +69,7 @@
assert verifier.verify(to_sign, actual_signature)
def test_verify_unicode_success(self):
- to_sign = u'foo'
+ to_sign = u"foo"
signer = _cryptography_rsa.RSASigner.from_string(PRIVATE_KEY_BYTES)
actual_signature = signer.sign(to_sign)
@@ -78,10 +78,10 @@
def test_verify_failure(self):
verifier = _cryptography_rsa.RSAVerifier.from_string(PUBLIC_KEY_BYTES)
- bad_signature1 = b''
- assert not verifier.verify(b'foo', bad_signature1)
- bad_signature2 = b'a'
- assert not verifier.verify(b'foo', bad_signature2)
+ bad_signature1 = b""
+ assert not verifier.verify(b"foo", bad_signature1)
+ bad_signature2 = b"a"
+ assert not verifier.verify(b"foo", bad_signature2)
def test_from_string_pub_key(self):
verifier = _cryptography_rsa.RSAVerifier.from_string(PUBLIC_KEY_BYTES)
@@ -134,16 +134,16 @@
_cryptography_rsa.RSASigner.from_string(PKCS12_KEY_BYTES)
def test_from_string_bogus_key(self):
- key_bytes = 'bogus-key'
+ key_bytes = "bogus-key"
with pytest.raises(ValueError):
_cryptography_rsa.RSASigner.from_string(key_bytes)
def test_from_service_account_info(self):
signer = _cryptography_rsa.RSASigner.from_service_account_info(
- SERVICE_ACCOUNT_INFO)
+ SERVICE_ACCOUNT_INFO
+ )
- assert signer.key_id == SERVICE_ACCOUNT_INFO[
- base._JSON_FILE_PRIVATE_KEY_ID]
+ assert signer.key_id == SERVICE_ACCOUNT_INFO[base._JSON_FILE_PRIVATE_KEY_ID]
assert isinstance(signer._key, rsa.RSAPrivateKey)
def test_from_service_account_info_missing_key(self):
@@ -154,8 +154,8 @@
def test_from_service_account_file(self):
signer = _cryptography_rsa.RSASigner.from_service_account_file(
- SERVICE_ACCOUNT_JSON_FILE)
+ SERVICE_ACCOUNT_JSON_FILE
+ )
- assert signer.key_id == SERVICE_ACCOUNT_INFO[
- base._JSON_FILE_PRIVATE_KEY_ID]
+ assert signer.key_id == SERVICE_ACCOUNT_INFO[base._JSON_FILE_PRIVATE_KEY_ID]
assert isinstance(signer._key, rsa.RSAPrivateKey)
diff --git a/tests/crypt/test__python_rsa.py b/tests/crypt/test__python_rsa.py
index d13105f..08b9503 100644
--- a/tests/crypt/test__python_rsa.py
+++ b/tests/crypt/test__python_rsa.py
@@ -26,21 +26,21 @@
from google.auth.crypt import base
-DATA_DIR = os.path.join(os.path.dirname(__file__), '..', 'data')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")
# To generate privatekey.pem, privatekey.pub, and public_cert.pem:
# $ openssl req -new -newkey rsa:1024 -x509 -nodes -out public_cert.pem \
# > -keyout privatekey.pem
# $ openssl rsa -in privatekey.pem -pubout -out privatekey.pub
-with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
PKCS1_KEY_BYTES = PRIVATE_KEY_BYTES
-with open(os.path.join(DATA_DIR, 'privatekey.pub'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "privatekey.pub"), "rb") as fh:
PUBLIC_KEY_BYTES = fh.read()
-with open(os.path.join(DATA_DIR, 'public_cert.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "public_cert.pem"), "rb") as fh:
PUBLIC_CERT_BYTES = fh.read()
# To generate pem_from_pkcs12.pem and privatekey.p12:
@@ -49,22 +49,22 @@
# $ openssl pkcs12 -in privatekey.p12 -nocerts -nodes \
# > -out pem_from_pkcs12.pem
-with open(os.path.join(DATA_DIR, 'pem_from_pkcs12.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "pem_from_pkcs12.pem"), "rb") as fh:
PKCS8_KEY_BYTES = fh.read()
-with open(os.path.join(DATA_DIR, 'privatekey.p12'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "privatekey.p12"), "rb") as fh:
PKCS12_KEY_BYTES = fh.read()
# The service account JSON file can be generated from the Google Cloud Console.
-SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
+SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
-with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
+with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh:
SERVICE_ACCOUNT_INFO = json.load(fh)
class TestRSAVerifier(object):
def test_verify_success(self):
- to_sign = b'foo'
+ to_sign = b"foo"
signer = _python_rsa.RSASigner.from_string(PRIVATE_KEY_BYTES)
actual_signature = signer.sign(to_sign)
@@ -72,7 +72,7 @@
assert verifier.verify(to_sign, actual_signature)
def test_verify_unicode_success(self):
- to_sign = u'foo'
+ to_sign = u"foo"
signer = _python_rsa.RSASigner.from_string(PRIVATE_KEY_BYTES)
actual_signature = signer.sign(to_sign)
@@ -81,10 +81,10 @@
def test_verify_failure(self):
verifier = _python_rsa.RSAVerifier.from_string(PUBLIC_KEY_BYTES)
- bad_signature1 = b''
- assert not verifier.verify(b'foo', bad_signature1)
- bad_signature2 = b'a'
- assert not verifier.verify(b'foo', bad_signature2)
+ bad_signature1 = b""
+ assert not verifier.verify(b"foo", bad_signature1)
+ bad_signature2 = b"a"
+ assert not verifier.verify(b"foo", bad_signature2)
def test_from_string_pub_key(self):
verifier = _python_rsa.RSAVerifier.from_string(PUBLIC_KEY_BYTES)
@@ -110,15 +110,15 @@
def test_from_string_pub_cert_failure(self):
cert_bytes = PUBLIC_CERT_BYTES
- true_der = rsa.pem.load_pem(cert_bytes, 'CERTIFICATE')
+ true_der = rsa.pem.load_pem(cert_bytes, "CERTIFICATE")
load_pem_patch = mock.patch(
- 'rsa.pem.load_pem', return_value=true_der + b'extra',
- autospec=True)
+ "rsa.pem.load_pem", return_value=true_der + b"extra", autospec=True
+ )
with load_pem_patch as load_pem:
with pytest.raises(ValueError):
_python_rsa.RSAVerifier.from_string(cert_bytes)
- load_pem.assert_called_once_with(cert_bytes, 'CERTIFICATE')
+ load_pem.assert_called_once_with(cert_bytes, "CERTIFICATE")
class TestRSASigner(object):
@@ -141,21 +141,21 @@
def test_from_string_pkcs8_extra_bytes(self):
key_bytes = PKCS8_KEY_BYTES
_, pem_bytes = pem.readPemBlocksFromFile(
- six.StringIO(_helpers.from_bytes(key_bytes)),
- _python_rsa._PKCS8_MARKER)
+ six.StringIO(_helpers.from_bytes(key_bytes)), _python_rsa._PKCS8_MARKER
+ )
- key_info, remaining = None, 'extra'
+ key_info, remaining = None, "extra"
decode_patch = mock.patch(
- 'pyasn1.codec.der.decoder.decode',
+ "pyasn1.codec.der.decoder.decode",
return_value=(key_info, remaining),
- autospec=True)
+ autospec=True,
+ )
with decode_patch as decode:
with pytest.raises(ValueError):
_python_rsa.RSASigner.from_string(key_bytes)
# Verify mock was called.
- decode.assert_called_once_with(
- pem_bytes, asn1Spec=_python_rsa._PKCS8_SPEC)
+ decode.assert_called_once_with(pem_bytes, asn1Spec=_python_rsa._PKCS8_SPEC)
def test_from_string_pkcs8_unicode(self):
key_bytes = _helpers.from_bytes(PKCS8_KEY_BYTES)
@@ -168,16 +168,14 @@
_python_rsa.RSASigner.from_string(PKCS12_KEY_BYTES)
def test_from_string_bogus_key(self):
- key_bytes = 'bogus-key'
+ key_bytes = "bogus-key"
with pytest.raises(ValueError):
_python_rsa.RSASigner.from_string(key_bytes)
def test_from_service_account_info(self):
- signer = _python_rsa.RSASigner.from_service_account_info(
- SERVICE_ACCOUNT_INFO)
+ signer = _python_rsa.RSASigner.from_service_account_info(SERVICE_ACCOUNT_INFO)
- assert signer.key_id == SERVICE_ACCOUNT_INFO[
- base._JSON_FILE_PRIVATE_KEY_ID]
+ assert signer.key_id == SERVICE_ACCOUNT_INFO[base._JSON_FILE_PRIVATE_KEY_ID]
assert isinstance(signer._key, rsa.key.PrivateKey)
def test_from_service_account_info_missing_key(self):
@@ -188,8 +186,8 @@
def test_from_service_account_file(self):
signer = _python_rsa.RSASigner.from_service_account_file(
- SERVICE_ACCOUNT_JSON_FILE)
+ SERVICE_ACCOUNT_JSON_FILE
+ )
- assert signer.key_id == SERVICE_ACCOUNT_INFO[
- base._JSON_FILE_PRIVATE_KEY_ID]
+ assert signer.key_id == SERVICE_ACCOUNT_INFO[base._JSON_FILE_PRIVATE_KEY_ID]
assert isinstance(signer._key, rsa.key.PrivateKey)
diff --git a/tests/crypt/test_crypt.py b/tests/crypt/test_crypt.py
index d8b1d00..16ff2e0 100644
--- a/tests/crypt/test_crypt.py
+++ b/tests/crypt/test_crypt.py
@@ -17,43 +17,42 @@
from google.auth import crypt
-DATA_DIR = os.path.join(os.path.dirname(__file__), '..', 'data')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")
# To generate privatekey.pem, privatekey.pub, and public_cert.pem:
# $ openssl req -new -newkey rsa:1024 -x509 -nodes -out public_cert.pem \
# > -keyout privatekey.pem
# $ openssl rsa -in privatekey.pem -pubout -out privatekey.pub
-with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
-with open(os.path.join(DATA_DIR, 'public_cert.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "public_cert.pem"), "rb") as fh:
PUBLIC_CERT_BYTES = fh.read()
# To generate other_cert.pem:
# $ openssl req -new -newkey rsa:1024 -x509 -nodes -out other_cert.pem
-with open(os.path.join(DATA_DIR, 'other_cert.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "other_cert.pem"), "rb") as fh:
OTHER_CERT_BYTES = fh.read()
def test_verify_signature():
- to_sign = b'foo'
+ to_sign = b"foo"
signer = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES)
signature = signer.sign(to_sign)
- assert crypt.verify_signature(
- to_sign, signature, PUBLIC_CERT_BYTES)
+ assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
# List of certs
assert crypt.verify_signature(
- to_sign, signature, [OTHER_CERT_BYTES, PUBLIC_CERT_BYTES])
+ to_sign, signature, [OTHER_CERT_BYTES, PUBLIC_CERT_BYTES]
+ )
def test_verify_signature_failure():
- to_sign = b'foo'
+ to_sign = b"foo"
signer = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES)
signature = signer.sign(to_sign)
- assert not crypt.verify_signature(
- to_sign, signature, OTHER_CERT_BYTES)
+ assert not crypt.verify_signature(to_sign, signature, OTHER_CERT_BYTES)
diff --git a/tests/oauth2/test__client.py b/tests/oauth2/test__client.py
index 6fc4c3b..c415a1f 100644
--- a/tests/oauth2/test__client.py
+++ b/tests/oauth2/test__client.py
@@ -30,42 +30,44 @@
from google.oauth2 import _client
-DATA_DIR = os.path.join(os.path.dirname(__file__), '..', 'data')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")
-with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
-SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
+SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
-SCOPES_AS_LIST = ['https://www.googleapis.com/auth/pubsub',
- 'https://www.googleapis.com/auth/logging.write']
-SCOPES_AS_STRING = ('https://www.googleapis.com/auth/pubsub'
- ' https://www.googleapis.com/auth/logging.write')
+SCOPES_AS_LIST = [
+ "https://www.googleapis.com/auth/pubsub",
+ "https://www.googleapis.com/auth/logging.write",
+]
+SCOPES_AS_STRING = (
+ "https://www.googleapis.com/auth/pubsub"
+ " https://www.googleapis.com/auth/logging.write"
+)
def test__handle_error_response():
- response_data = json.dumps({
- 'error': 'help',
- 'error_description': 'I\'m alive'})
+ response_data = json.dumps({"error": "help", "error_description": "I'm alive"})
with pytest.raises(exceptions.RefreshError) as excinfo:
_client._handle_error_response(response_data)
- assert excinfo.match(r'help: I\'m alive')
+ assert excinfo.match(r"help: I\'m alive")
def test__handle_error_response_non_json():
- response_data = 'Help, I\'m alive'
+ response_data = "Help, I'm alive"
with pytest.raises(exceptions.RefreshError) as excinfo:
_client._handle_error_response(response_data)
- assert excinfo.match(r'Help, I\'m alive')
+ assert excinfo.match(r"Help, I\'m alive")
[email protected]('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
[email protected]("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test__parse_expiry(unused_utcnow):
- result = _client._parse_expiry({'expires_in': 500})
+ result = _client._parse_expiry({"expires_in": 500})
assert result == datetime.datetime.min + datetime.timedelta(seconds=500)
@@ -76,188 +78,214 @@
def make_request(response_data, status=http_client.OK):
response = mock.create_autospec(transport.Response, instance=True)
response.status = status
- response.data = json.dumps(response_data).encode('utf-8')
+ response.data = json.dumps(response_data).encode("utf-8")
request = mock.create_autospec(transport.Request)
request.return_value = response
return request
def test__token_endpoint_request():
- request = make_request({'test': 'response'})
+ request = make_request({"test": "response"})
result = _client._token_endpoint_request(
- request, 'http://example.com', {'test': 'params'})
+ request, "http://example.com", {"test": "params"}
+ )
# Check request call
request.assert_called_with(
- method='POST',
- url='http://example.com',
- headers={'content-type': 'application/x-www-form-urlencoded'},
- body='test=params')
+ method="POST",
+ url="http://example.com",
+ headers={"content-type": "application/x-www-form-urlencoded"},
+ body="test=params",
+ )
# Check result
- assert result == {'test': 'response'}
+ assert result == {"test": "response"}
def test__token_endpoint_request_error():
request = make_request({}, status=http_client.BAD_REQUEST)
with pytest.raises(exceptions.RefreshError):
- _client._token_endpoint_request(request, 'http://example.com', {})
+ _client._token_endpoint_request(request, "http://example.com", {})
def test__token_endpoint_request_internal_failure_error():
- request = make_request({'error': 'internal_failure',
- 'error_description': 'internal_failure'},
- status=http_client.BAD_REQUEST)
+ request = make_request(
+ {"error": "internal_failure", "error_description": "internal_failure"},
+ status=http_client.BAD_REQUEST,
+ )
with pytest.raises(exceptions.RefreshError):
_client._token_endpoint_request(
- request, 'http://example.com',
- {'error': 'internal_failure',
- 'error_description': 'internal_failure'})
+ request,
+ "http://example.com",
+ {"error": "internal_failure", "error_description": "internal_failure"},
+ )
def verify_request_params(request, params):
- request_body = request.call_args[1]['body']
+ request_body = request.call_args[1]["body"]
request_params = urllib.parse.parse_qs(request_body)
for key, value in six.iteritems(params):
assert request_params[key][0] == value
[email protected]('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
[email protected]("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_jwt_grant(utcnow):
- request = make_request({
- 'access_token': 'token',
- 'expires_in': 500,
- 'extra': 'data'})
+ request = make_request(
+ {"access_token": "token", "expires_in": 500, "extra": "data"}
+ )
token, expiry, extra_data = _client.jwt_grant(
- request, 'http://example.com', 'assertion_value')
+ request, "http://example.com", "assertion_value"
+ )
# Check request call
- verify_request_params(request, {
- 'grant_type': _client._JWT_GRANT_TYPE,
- 'assertion': 'assertion_value'
- })
+ verify_request_params(
+ request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"}
+ )
# Check result
- assert token == 'token'
+ assert token == "token"
assert expiry == utcnow() + datetime.timedelta(seconds=500)
- assert extra_data['extra'] == 'data'
+ assert extra_data["extra"] == "data"
def test_jwt_grant_no_access_token():
- request = make_request({
- # No access token.
- 'expires_in': 500,
- 'extra': 'data'})
+ request = make_request(
+ {
+ # No access token.
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
with pytest.raises(exceptions.RefreshError):
- _client.jwt_grant(request, 'http://example.com', 'assertion_value')
+ _client.jwt_grant(request, "http://example.com", "assertion_value")
def test_id_token_jwt_grant():
now = _helpers.utcnow()
id_token_expiry = _helpers.datetime_to_secs(now)
- id_token = jwt.encode(SIGNER, {'exp': id_token_expiry}).decode('utf-8')
- request = make_request({
- 'id_token': id_token,
- 'extra': 'data'})
+ id_token = jwt.encode(SIGNER, {"exp": id_token_expiry}).decode("utf-8")
+ request = make_request({"id_token": id_token, "extra": "data"})
token, expiry, extra_data = _client.id_token_jwt_grant(
- request, 'http://example.com', 'assertion_value')
+ request, "http://example.com", "assertion_value"
+ )
# Check request call
- verify_request_params(request, {
- 'grant_type': _client._JWT_GRANT_TYPE,
- 'assertion': 'assertion_value'
- })
+ verify_request_params(
+ request, {"grant_type": _client._JWT_GRANT_TYPE, "assertion": "assertion_value"}
+ )
# Check result
assert token == id_token
# JWT does not store microseconds
now = now.replace(microsecond=0)
assert expiry == now
- assert extra_data['extra'] == 'data'
+ assert extra_data["extra"] == "data"
def test_id_token_jwt_grant_no_access_token():
- request = make_request({
- # No access token.
- 'expires_in': 500,
- 'extra': 'data'})
+ request = make_request(
+ {
+ # No access token.
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
with pytest.raises(exceptions.RefreshError):
- _client.id_token_jwt_grant(
- request, 'http://example.com', 'assertion_value')
+ _client.id_token_jwt_grant(request, "http://example.com", "assertion_value")
[email protected]('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
[email protected]("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh_grant(unused_utcnow):
- request = make_request({
- 'access_token': 'token',
- 'refresh_token': 'new_refresh_token',
- 'expires_in': 500,
- 'extra': 'data'})
+ request = make_request(
+ {
+ "access_token": "token",
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
token, refresh_token, expiry, extra_data = _client.refresh_grant(
- request, 'http://example.com', 'refresh_token', 'client_id',
- 'client_secret')
+ request, "http://example.com", "refresh_token", "client_id", "client_secret"
+ )
# Check request call
- verify_request_params(request, {
- 'grant_type': _client._REFRESH_GRANT_TYPE,
- 'refresh_token': 'refresh_token',
- 'client_id': 'client_id',
- 'client_secret': 'client_secret'
- })
+ verify_request_params(
+ request,
+ {
+ "grant_type": _client._REFRESH_GRANT_TYPE,
+ "refresh_token": "refresh_token",
+ "client_id": "client_id",
+ "client_secret": "client_secret",
+ },
+ )
# Check result
- assert token == 'token'
- assert refresh_token == 'new_refresh_token'
+ assert token == "token"
+ assert refresh_token == "new_refresh_token"
assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500)
- assert extra_data['extra'] == 'data'
+ assert extra_data["extra"] == "data"
[email protected]('google.auth._helpers.utcnow', return_value=datetime.datetime.min)
[email protected]("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh_grant_with_scopes(unused_utcnow):
- request = make_request({
- 'access_token': 'token',
- 'refresh_token': 'new_refresh_token',
- 'expires_in': 500,
- 'extra': 'data',
- 'scope': SCOPES_AS_STRING})
+ request = make_request(
+ {
+ "access_token": "token",
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ "scope": SCOPES_AS_STRING,
+ }
+ )
token, refresh_token, expiry, extra_data = _client.refresh_grant(
- request, 'http://example.com', 'refresh_token', 'client_id',
- 'client_secret', SCOPES_AS_LIST)
+ request,
+ "http://example.com",
+ "refresh_token",
+ "client_id",
+ "client_secret",
+ SCOPES_AS_LIST,
+ )
# Check request call.
- verify_request_params(request, {
- 'grant_type': _client._REFRESH_GRANT_TYPE,
- 'refresh_token': 'refresh_token',
- 'client_id': 'client_id',
- 'client_secret': 'client_secret',
- 'scope': SCOPES_AS_STRING
- })
+ verify_request_params(
+ request,
+ {
+ "grant_type": _client._REFRESH_GRANT_TYPE,
+ "refresh_token": "refresh_token",
+ "client_id": "client_id",
+ "client_secret": "client_secret",
+ "scope": SCOPES_AS_STRING,
+ },
+ )
# Check result.
- assert token == 'token'
- assert refresh_token == 'new_refresh_token'
+ assert token == "token"
+ assert refresh_token == "new_refresh_token"
assert expiry == datetime.datetime.min + datetime.timedelta(seconds=500)
- assert extra_data['extra'] == 'data'
+ assert extra_data["extra"] == "data"
def test_refresh_grant_no_access_token():
- request = make_request({
- # No access token.
- 'refresh_token': 'new_refresh_token',
- 'expires_in': 500,
- 'extra': 'data'})
+ request = make_request(
+ {
+ # No access token.
+ "refresh_token": "new_refresh_token",
+ "expires_in": 500,
+ "extra": "data",
+ }
+ )
with pytest.raises(exceptions.RefreshError):
_client.refresh_grant(
- request, 'http://example.com', 'refresh_token', 'client_id',
- 'client_secret')
+ request, "http://example.com", "refresh_token", "client_id", "client_secret"
+ )
diff --git a/tests/oauth2/test_credentials.py b/tests/oauth2/test_credentials.py
index 3231509..8bfdd7e 100644
--- a/tests/oauth2/test_credentials.py
+++ b/tests/oauth2/test_credentials.py
@@ -25,26 +25,29 @@
from google.oauth2 import credentials
-DATA_DIR = os.path.join(os.path.dirname(__file__), '..', 'data')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")
-AUTH_USER_JSON_FILE = os.path.join(DATA_DIR, 'authorized_user.json')
+AUTH_USER_JSON_FILE = os.path.join(DATA_DIR, "authorized_user.json")
-with open(AUTH_USER_JSON_FILE, 'r') as fh:
+with open(AUTH_USER_JSON_FILE, "r") as fh:
AUTH_USER_INFO = json.load(fh)
class TestCredentials(object):
- TOKEN_URI = 'https://example.com/oauth2/token'
- REFRESH_TOKEN = 'refresh_token'
- CLIENT_ID = 'client_id'
- CLIENT_SECRET = 'client_secret'
+ TOKEN_URI = "https://example.com/oauth2/token"
+ REFRESH_TOKEN = "refresh_token"
+ CLIENT_ID = "client_id"
+ CLIENT_SECRET = "client_secret"
@classmethod
def make_credentials(cls):
return credentials.Credentials(
- token=None, refresh_token=cls.REFRESH_TOKEN,
- token_uri=cls.TOKEN_URI, client_id=cls.CLIENT_ID,
- client_secret=cls.CLIENT_SECRET)
+ token=None,
+ refresh_token=cls.REFRESH_TOKEN,
+ token_uri=cls.TOKEN_URI,
+ client_id=cls.CLIENT_ID,
+ client_secret=cls.CLIENT_SECRET,
+ )
def test_default_state(self):
credentials = self.make_credentials()
@@ -59,14 +62,15 @@
assert credentials.client_id == self.CLIENT_ID
assert credentials.client_secret == self.CLIENT_SECRET
- @mock.patch('google.oauth2._client.refresh_grant', autospec=True)
+ @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.min + _helpers.CLOCK_SKEW)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
def test_refresh_success(self, unused_utcnow, refresh_grant):
- token = 'token'
+ token = "token"
expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {'id_token': mock.sentinel.id_token}
+ grant_response = {"id_token": mock.sentinel.id_token}
refresh_grant.return_value = (
# Access token
token,
@@ -75,7 +79,8 @@
# Expiry,
expiry,
# Extra data
- grant_response)
+ grant_response,
+ )
request = mock.create_autospec(transport.Request)
credentials = self.make_credentials()
@@ -85,8 +90,13 @@
# Check jwt grant call.
refresh_grant.assert_called_with(
- request, self.TOKEN_URI, self.REFRESH_TOKEN, self.CLIENT_ID,
- self.CLIENT_SECRET, None)
+ request,
+ self.TOKEN_URI,
+ self.REFRESH_TOKEN,
+ self.CLIENT_ID,
+ self.CLIENT_SECRET,
+ None,
+ )
# Check that the credentials have the token and expiry
assert credentials.token == token
@@ -99,24 +109,25 @@
def test_refresh_no_refresh_token(self):
request = mock.create_autospec(transport.Request)
- credentials_ = credentials.Credentials(
- token=None, refresh_token=None)
+ credentials_ = credentials.Credentials(token=None, refresh_token=None)
- with pytest.raises(exceptions.RefreshError, match='necessary fields'):
+ with pytest.raises(exceptions.RefreshError, match="necessary fields"):
credentials_.refresh(request)
request.assert_not_called()
- @mock.patch('google.oauth2._client.refresh_grant', autospec=True)
+ @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.min + _helpers.CLOCK_SKEW)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
def test_credentials_with_scopes_requested_refresh_success(
- self, unused_utcnow, refresh_grant):
- scopes = ['email', 'profile']
- token = 'token'
+ self, unused_utcnow, refresh_grant
+ ):
+ scopes = ["email", "profile"]
+ token = "token"
expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {'id_token': mock.sentinel.id_token}
+ grant_response = {"id_token": mock.sentinel.id_token}
refresh_grant.return_value = (
# Access token
token,
@@ -125,21 +136,31 @@
# Expiry,
expiry,
# Extra data
- grant_response)
+ grant_response,
+ )
request = mock.create_autospec(transport.Request)
creds = credentials.Credentials(
- token=None, refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI, client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET, scopes=scopes)
+ token=None,
+ refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI,
+ client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET,
+ scopes=scopes,
+ )
# Refresh credentials
creds.refresh(request)
# Check jwt grant call.
refresh_grant.assert_called_with(
- request, self.TOKEN_URI, self.REFRESH_TOKEN, self.CLIENT_ID,
- self.CLIENT_SECRET, scopes)
+ request,
+ self.TOKEN_URI,
+ self.REFRESH_TOKEN,
+ self.CLIENT_ID,
+ self.CLIENT_SECRET,
+ scopes,
+ )
# Check that the credentials have the token and expiry
assert creds.token == token
@@ -151,17 +172,21 @@
# expired.)
assert creds.valid
- @mock.patch('google.oauth2._client.refresh_grant', autospec=True)
+ @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.min + _helpers.CLOCK_SKEW)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
def test_credentials_with_scopes_returned_refresh_success(
- self, unused_utcnow, refresh_grant):
- scopes = ['email', 'profile']
- token = 'token'
+ self, unused_utcnow, refresh_grant
+ ):
+ scopes = ["email", "profile"]
+ token = "token"
expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {'id_token': mock.sentinel.id_token,
- 'scopes': ' '.join(scopes)}
+ grant_response = {
+ "id_token": mock.sentinel.id_token,
+ "scopes": " ".join(scopes),
+ }
refresh_grant.return_value = (
# Access token
token,
@@ -170,21 +195,31 @@
# Expiry,
expiry,
# Extra data
- grant_response)
+ grant_response,
+ )
request = mock.create_autospec(transport.Request)
creds = credentials.Credentials(
- token=None, refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI, client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET, scopes=scopes)
+ token=None,
+ refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI,
+ client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET,
+ scopes=scopes,
+ )
# Refresh credentials
creds.refresh(request)
# Check jwt grant call.
refresh_grant.assert_called_with(
- request, self.TOKEN_URI, self.REFRESH_TOKEN, self.CLIENT_ID,
- self.CLIENT_SECRET, scopes)
+ request,
+ self.TOKEN_URI,
+ self.REFRESH_TOKEN,
+ self.CLIENT_ID,
+ self.CLIENT_SECRET,
+ scopes,
+ )
# Check that the credentials have the token and expiry
assert creds.token == token
@@ -196,18 +231,22 @@
# expired.)
assert creds.valid
- @mock.patch('google.oauth2._client.refresh_grant', autospec=True)
+ @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
@mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.min + _helpers.CLOCK_SKEW)
+ "google.auth._helpers.utcnow",
+ return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+ )
def test_credentials_with_scopes_refresh_failure_raises_refresh_error(
- self, unused_utcnow, refresh_grant):
- scopes = ['email', 'profile']
- scopes_returned = ['email']
- token = 'token'
+ self, unused_utcnow, refresh_grant
+ ):
+ scopes = ["email", "profile"]
+ scopes_returned = ["email"]
+ token = "token"
expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
- grant_response = {'id_token': mock.sentinel.id_token,
- 'scopes': ' '.join(scopes_returned)}
+ grant_response = {
+ "id_token": mock.sentinel.id_token,
+ "scopes": " ".join(scopes_returned),
+ }
refresh_grant.return_value = (
# Access token
token,
@@ -216,23 +255,34 @@
# Expiry,
expiry,
# Extra data
- grant_response)
+ grant_response,
+ )
request = mock.create_autospec(transport.Request)
creds = credentials.Credentials(
- token=None, refresh_token=self.REFRESH_TOKEN,
- token_uri=self.TOKEN_URI, client_id=self.CLIENT_ID,
- client_secret=self.CLIENT_SECRET, scopes=scopes)
+ token=None,
+ refresh_token=self.REFRESH_TOKEN,
+ token_uri=self.TOKEN_URI,
+ client_id=self.CLIENT_ID,
+ client_secret=self.CLIENT_SECRET,
+ scopes=scopes,
+ )
# Refresh credentials
- with pytest.raises(exceptions.RefreshError,
- match='Not all requested scopes were granted'):
+ with pytest.raises(
+ exceptions.RefreshError, match="Not all requested scopes were granted"
+ ):
creds.refresh(request)
# Check jwt grant call.
refresh_grant.assert_called_with(
- request, self.TOKEN_URI, self.REFRESH_TOKEN, self.CLIENT_ID,
- self.CLIENT_SECRET, scopes)
+ request,
+ self.TOKEN_URI,
+ self.REFRESH_TOKEN,
+ self.CLIENT_ID,
+ self.CLIENT_SECRET,
+ scopes,
+ )
# Check that the credentials have the token and expiry
assert creds.token == token
@@ -248,37 +298,36 @@
info = AUTH_USER_INFO.copy()
creds = credentials.Credentials.from_authorized_user_info(info)
- assert creds.client_secret == info['client_secret']
- assert creds.client_id == info['client_id']
- assert creds.refresh_token == info['refresh_token']
+ assert creds.client_secret == info["client_secret"]
+ assert creds.client_id == info["client_id"]
+ assert creds.refresh_token == info["refresh_token"]
assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
assert creds.scopes is None
- scopes = ['email', 'profile']
- creds = credentials.Credentials.from_authorized_user_info(
- info, scopes)
- assert creds.client_secret == info['client_secret']
- assert creds.client_id == info['client_id']
- assert creds.refresh_token == info['refresh_token']
+ scopes = ["email", "profile"]
+ creds = credentials.Credentials.from_authorized_user_info(info, scopes)
+ assert creds.client_secret == info["client_secret"]
+ assert creds.client_id == info["client_id"]
+ assert creds.refresh_token == info["refresh_token"]
assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
assert creds.scopes == scopes
def test_from_authorized_user_file(self):
info = AUTH_USER_INFO.copy()
- creds = credentials.Credentials.from_authorized_user_file(
- AUTH_USER_JSON_FILE)
- assert creds.client_secret == info['client_secret']
- assert creds.client_id == info['client_id']
- assert creds.refresh_token == info['refresh_token']
+ creds = credentials.Credentials.from_authorized_user_file(AUTH_USER_JSON_FILE)
+ assert creds.client_secret == info["client_secret"]
+ assert creds.client_id == info["client_id"]
+ assert creds.refresh_token == info["refresh_token"]
assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
assert creds.scopes is None
- scopes = ['email', 'profile']
+ scopes = ["email", "profile"]
creds = credentials.Credentials.from_authorized_user_file(
- AUTH_USER_JSON_FILE, scopes)
- assert creds.client_secret == info['client_secret']
- assert creds.client_id == info['client_id']
- assert creds.refresh_token == info['refresh_token']
+ AUTH_USER_JSON_FILE, scopes
+ )
+ assert creds.client_secret == info["client_secret"]
+ assert creds.client_id == info["client_id"]
+ assert creds.refresh_token == info["refresh_token"]
assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
assert creds.scopes == scopes
diff --git a/tests/oauth2/test_id_token.py b/tests/oauth2/test_id_token.py
index 360f92f..980a8e9 100644
--- a/tests/oauth2/test_id_token.py
+++ b/tests/oauth2/test_id_token.py
@@ -27,7 +27,7 @@
response.status = status
if data is not None:
- response.data = json.dumps(data).encode('utf-8')
+ response.data = json.dumps(data).encode("utf-8")
request = mock.create_autospec(transport.Request)
request.return_value = response
@@ -35,12 +35,12 @@
def test__fetch_certs_success():
- certs = {'1': 'cert'}
+ certs = {"1": "cert"}
request = make_request(200, certs)
returned_certs = id_token._fetch_certs(request, mock.sentinel.cert_url)
- request.assert_called_once_with(mock.sentinel.cert_url, method='GET')
+ request.assert_called_once_with(mock.sentinel.cert_url, method="GET")
assert returned_certs == certs
@@ -50,66 +50,67 @@
with pytest.raises(exceptions.TransportError):
id_token._fetch_certs(request, mock.sentinel.cert_url)
- request.assert_called_once_with(mock.sentinel.cert_url, method='GET')
+ request.assert_called_once_with(mock.sentinel.cert_url, method="GET")
[email protected]('google.auth.jwt.decode', autospec=True)
[email protected]('google.oauth2.id_token._fetch_certs', autospec=True)
[email protected]("google.auth.jwt.decode", autospec=True)
[email protected]("google.oauth2.id_token._fetch_certs", autospec=True)
def test_verify_token(_fetch_certs, decode):
result = id_token.verify_token(mock.sentinel.token, mock.sentinel.request)
assert result == decode.return_value
_fetch_certs.assert_called_once_with(
- mock.sentinel.request, id_token._GOOGLE_OAUTH2_CERTS_URL)
+ mock.sentinel.request, id_token._GOOGLE_OAUTH2_CERTS_URL
+ )
decode.assert_called_once_with(
- mock.sentinel.token,
- certs=_fetch_certs.return_value,
- audience=None)
+ mock.sentinel.token, certs=_fetch_certs.return_value, audience=None
+ )
[email protected]('google.auth.jwt.decode', autospec=True)
[email protected]('google.oauth2.id_token._fetch_certs', autospec=True)
[email protected]("google.auth.jwt.decode", autospec=True)
[email protected]("google.oauth2.id_token._fetch_certs", autospec=True)
def test_verify_token_args(_fetch_certs, decode):
result = id_token.verify_token(
mock.sentinel.token,
mock.sentinel.request,
audience=mock.sentinel.audience,
- certs_url=mock.sentinel.certs_url)
+ certs_url=mock.sentinel.certs_url,
+ )
assert result == decode.return_value
- _fetch_certs.assert_called_once_with(
- mock.sentinel.request, mock.sentinel.certs_url)
+ _fetch_certs.assert_called_once_with(mock.sentinel.request, mock.sentinel.certs_url)
decode.assert_called_once_with(
mock.sentinel.token,
certs=_fetch_certs.return_value,
- audience=mock.sentinel.audience)
+ audience=mock.sentinel.audience,
+ )
[email protected]('google.oauth2.id_token.verify_token', autospec=True)
[email protected]("google.oauth2.id_token.verify_token", autospec=True)
def test_verify_oauth2_token(verify_token):
result = id_token.verify_oauth2_token(
- mock.sentinel.token,
- mock.sentinel.request,
- audience=mock.sentinel.audience)
+ mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience
+ )
assert result == verify_token.return_value
verify_token.assert_called_once_with(
mock.sentinel.token,
mock.sentinel.request,
audience=mock.sentinel.audience,
- certs_url=id_token._GOOGLE_OAUTH2_CERTS_URL)
+ certs_url=id_token._GOOGLE_OAUTH2_CERTS_URL,
+ )
[email protected]('google.oauth2.id_token.verify_token', autospec=True)
[email protected]("google.oauth2.id_token.verify_token", autospec=True)
def test_verify_firebase_token(verify_token):
result = id_token.verify_firebase_token(
- mock.sentinel.token,
- mock.sentinel.request,
- audience=mock.sentinel.audience)
+ mock.sentinel.token, mock.sentinel.request, audience=mock.sentinel.audience
+ )
assert result == verify_token.return_value
verify_token.assert_called_once_with(
mock.sentinel.token,
mock.sentinel.request,
audience=mock.sentinel.audience,
- certs_url=id_token._GOOGLE_APIS_CERTS_URL)
+ certs_url=id_token._GOOGLE_APIS_CERTS_URL,
+ )
diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py
index 54ac0f5..0f9d460 100644
--- a/tests/oauth2/test_service_account.py
+++ b/tests/oauth2/test_service_account.py
@@ -25,58 +25,58 @@
from google.oauth2 import service_account
-DATA_DIR = os.path.join(os.path.dirname(__file__), '..', 'data')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data")
-with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
-with open(os.path.join(DATA_DIR, 'public_cert.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "public_cert.pem"), "rb") as fh:
PUBLIC_CERT_BYTES = fh.read()
-with open(os.path.join(DATA_DIR, 'other_cert.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "other_cert.pem"), "rb") as fh:
OTHER_CERT_BYTES = fh.read()
-SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
+SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
-with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
+with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh:
SERVICE_ACCOUNT_INFO = json.load(fh)
-SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
+SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
class TestCredentials(object):
- SERVICE_ACCOUNT_EMAIL = '[email protected]'
- TOKEN_URI = 'https://example.com/oauth2/token'
+ SERVICE_ACCOUNT_EMAIL = "[email protected]"
+ TOKEN_URI = "https://example.com/oauth2/token"
@classmethod
def make_credentials(cls):
return service_account.Credentials(
- SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI)
+ SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI
+ )
def test_from_service_account_info(self):
credentials = service_account.Credentials.from_service_account_info(
- SERVICE_ACCOUNT_INFO)
+ SERVICE_ACCOUNT_INFO
+ )
- assert (credentials._signer.key_id ==
- SERVICE_ACCOUNT_INFO['private_key_id'])
- assert (credentials.service_account_email ==
- SERVICE_ACCOUNT_INFO['client_email'])
- assert credentials._token_uri == SERVICE_ACCOUNT_INFO['token_uri']
+ assert credentials._signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"]
+ assert credentials.service_account_email == SERVICE_ACCOUNT_INFO["client_email"]
+ assert credentials._token_uri == SERVICE_ACCOUNT_INFO["token_uri"]
def test_from_service_account_info_args(self):
info = SERVICE_ACCOUNT_INFO.copy()
- scopes = ['email', 'profile']
- subject = 'subject'
- additional_claims = {'meta': 'data'}
+ scopes = ["email", "profile"]
+ subject = "subject"
+ additional_claims = {"meta": "data"}
credentials = service_account.Credentials.from_service_account_info(
- info, scopes=scopes, subject=subject,
- additional_claims=additional_claims)
+ info, scopes=scopes, subject=subject, additional_claims=additional_claims
+ )
- assert credentials.service_account_email == info['client_email']
- assert credentials.project_id == info['project_id']
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._token_uri == info['token_uri']
+ assert credentials.service_account_email == info["client_email"]
+ assert credentials.project_id == info["project_id"]
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._token_uri == info["token_uri"]
assert credentials._scopes == scopes
assert credentials._subject == subject
assert credentials._additional_claims == additional_claims
@@ -85,27 +85,31 @@
info = SERVICE_ACCOUNT_INFO.copy()
credentials = service_account.Credentials.from_service_account_file(
- SERVICE_ACCOUNT_JSON_FILE)
+ SERVICE_ACCOUNT_JSON_FILE
+ )
- assert credentials.service_account_email == info['client_email']
- assert credentials.project_id == info['project_id']
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._token_uri == info['token_uri']
+ assert credentials.service_account_email == info["client_email"]
+ assert credentials.project_id == info["project_id"]
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._token_uri == info["token_uri"]
def test_from_service_account_file_args(self):
info = SERVICE_ACCOUNT_INFO.copy()
- scopes = ['email', 'profile']
- subject = 'subject'
- additional_claims = {'meta': 'data'}
+ scopes = ["email", "profile"]
+ subject = "subject"
+ additional_claims = {"meta": "data"}
credentials = service_account.Credentials.from_service_account_file(
- SERVICE_ACCOUNT_JSON_FILE, subject=subject,
- scopes=scopes, additional_claims=additional_claims)
+ SERVICE_ACCOUNT_JSON_FILE,
+ subject=subject,
+ scopes=scopes,
+ additional_claims=additional_claims,
+ )
- assert credentials.service_account_email == info['client_email']
- assert credentials.project_id == info['project_id']
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._token_uri == info['token_uri']
+ assert credentials.service_account_email == info["client_email"]
+ assert credentials.project_id == info["project_id"]
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._token_uri == info["token_uri"]
assert credentials._scopes == scopes
assert credentials._subject == subject
assert credentials._additional_claims == additional_claims
@@ -120,7 +124,7 @@
def test_sign_bytes(self):
credentials = self.make_credentials()
- to_sign = b'123'
+ to_sign = b"123"
signature = credentials.sign_bytes(to_sign)
assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
@@ -134,46 +138,47 @@
def test_create_scoped(self):
credentials = self.make_credentials()
- scopes = ['email', 'profile']
+ scopes = ["email", "profile"]
credentials = credentials.with_scopes(scopes)
assert credentials._scopes == scopes
def test_with_claims(self):
credentials = self.make_credentials()
- new_credentials = credentials.with_claims({'meep': 'moop'})
- assert new_credentials._additional_claims == {'meep': 'moop'}
+ new_credentials = credentials.with_claims({"meep": "moop"})
+ assert new_credentials._additional_claims == {"meep": "moop"}
def test__make_authorization_grant_assertion(self):
credentials = self.make_credentials()
token = credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, PUBLIC_CERT_BYTES)
- assert payload['iss'] == self.SERVICE_ACCOUNT_EMAIL
- assert payload['aud'] == self.TOKEN_URI
+ assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
+ assert payload["aud"] == self.TOKEN_URI
def test__make_authorization_grant_assertion_scoped(self):
credentials = self.make_credentials()
- scopes = ['email', 'profile']
+ scopes = ["email", "profile"]
credentials = credentials.with_scopes(scopes)
token = credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, PUBLIC_CERT_BYTES)
- assert payload['scope'] == 'email profile'
+ assert payload["scope"] == "email profile"
def test__make_authorization_grant_assertion_subject(self):
credentials = self.make_credentials()
- subject = '[email protected]'
+ subject = "[email protected]"
credentials = credentials.with_subject(subject)
token = credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, PUBLIC_CERT_BYTES)
- assert payload['sub'] == subject
+ assert payload["sub"] == subject
- @mock.patch('google.oauth2._client.jwt_grant', autospec=True)
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_refresh_success(self, jwt_grant):
credentials = self.make_credentials()
- token = 'token'
+ token = "token"
jwt_grant.return_value = (
token,
_helpers.utcnow() + datetime.timedelta(seconds=500),
- {})
+ {},
+ )
request = mock.create_autospec(transport.Request, instance=True)
# Refresh credentials
@@ -196,20 +201,22 @@
# expired)
assert credentials.valid
- @mock.patch('google.oauth2._client.jwt_grant', autospec=True)
+ @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
def test_before_request_refreshes(self, jwt_grant):
credentials = self.make_credentials()
- token = 'token'
+ token = "token"
jwt_grant.return_value = (
- token, _helpers.utcnow() + datetime.timedelta(seconds=500), None)
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ None,
+ )
request = mock.create_autospec(transport.Request, instance=True)
# Credentials should start as invalid
assert not credentials.valid
# before_request should cause a refresh
- credentials.before_request(
- request, 'GET', 'http://example.com?a=1#3', {})
+ credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
# The refresh endpoint should've been called.
assert jwt_grant.called
@@ -219,40 +226,36 @@
class TestIDTokenCredentials(object):
- SERVICE_ACCOUNT_EMAIL = '[email protected]'
- TOKEN_URI = 'https://example.com/oauth2/token'
- TARGET_AUDIENCE = 'https://example.com'
+ SERVICE_ACCOUNT_EMAIL = "[email protected]"
+ TOKEN_URI = "https://example.com/oauth2/token"
+ TARGET_AUDIENCE = "https://example.com"
@classmethod
def make_credentials(cls):
return service_account.IDTokenCredentials(
- SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI,
- cls.TARGET_AUDIENCE)
+ SIGNER, cls.SERVICE_ACCOUNT_EMAIL, cls.TOKEN_URI, cls.TARGET_AUDIENCE
+ )
def test_from_service_account_info(self):
- credentials = (
- service_account.IDTokenCredentials.from_service_account_info(
- SERVICE_ACCOUNT_INFO,
- target_audience=self.TARGET_AUDIENCE))
+ credentials = service_account.IDTokenCredentials.from_service_account_info(
+ SERVICE_ACCOUNT_INFO, target_audience=self.TARGET_AUDIENCE
+ )
- assert (credentials._signer.key_id ==
- SERVICE_ACCOUNT_INFO['private_key_id'])
- assert (credentials.service_account_email ==
- SERVICE_ACCOUNT_INFO['client_email'])
- assert credentials._token_uri == SERVICE_ACCOUNT_INFO['token_uri']
+ assert credentials._signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"]
+ assert credentials.service_account_email == SERVICE_ACCOUNT_INFO["client_email"]
+ assert credentials._token_uri == SERVICE_ACCOUNT_INFO["token_uri"]
assert credentials._target_audience == self.TARGET_AUDIENCE
def test_from_service_account_file(self):
info = SERVICE_ACCOUNT_INFO.copy()
- credentials = (
- service_account.IDTokenCredentials.from_service_account_file(
- SERVICE_ACCOUNT_JSON_FILE,
- target_audience=self.TARGET_AUDIENCE))
+ credentials = service_account.IDTokenCredentials.from_service_account_file(
+ SERVICE_ACCOUNT_JSON_FILE, target_audience=self.TARGET_AUDIENCE
+ )
- assert credentials.service_account_email == info['client_email']
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._token_uri == info['token_uri']
+ assert credentials.service_account_email == info["client_email"]
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._token_uri == info["token_uri"]
assert credentials._target_audience == self.TARGET_AUDIENCE
def test_default_state(self):
@@ -263,7 +266,7 @@
def test_sign_bytes(self):
credentials = self.make_credentials()
- to_sign = b'123'
+ to_sign = b"123"
signature = credentials.sign_bytes(to_sign)
assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
@@ -277,26 +280,26 @@
def test_with_target_audience(self):
credentials = self.make_credentials()
- new_credentials = credentials.with_target_audience(
- 'https://new.example.com')
- assert new_credentials._target_audience == 'https://new.example.com'
+ new_credentials = credentials.with_target_audience("https://new.example.com")
+ assert new_credentials._target_audience == "https://new.example.com"
def test__make_authorization_grant_assertion(self):
credentials = self.make_credentials()
token = credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, PUBLIC_CERT_BYTES)
- assert payload['iss'] == self.SERVICE_ACCOUNT_EMAIL
- assert payload['aud'] == self.TOKEN_URI
- assert payload['target_audience'] == self.TARGET_AUDIENCE
+ assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
+ assert payload["aud"] == self.TOKEN_URI
+ assert payload["target_audience"] == self.TARGET_AUDIENCE
- @mock.patch('google.oauth2._client.id_token_jwt_grant', autospec=True)
+ @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
def test_refresh_success(self, id_token_jwt_grant):
credentials = self.make_credentials()
- token = 'token'
+ token = "token"
id_token_jwt_grant.return_value = (
token,
_helpers.utcnow() + datetime.timedelta(seconds=500),
- {})
+ {},
+ )
request = mock.create_autospec(transport.Request, instance=True)
# Refresh credentials
@@ -319,20 +322,22 @@
# expired)
assert credentials.valid
- @mock.patch('google.oauth2._client.id_token_jwt_grant', autospec=True)
+ @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
def test_before_request_refreshes(self, id_token_jwt_grant):
credentials = self.make_credentials()
- token = 'token'
+ token = "token"
id_token_jwt_grant.return_value = (
- token, _helpers.utcnow() + datetime.timedelta(seconds=500), None)
+ token,
+ _helpers.utcnow() + datetime.timedelta(seconds=500),
+ None,
+ )
request = mock.create_autospec(transport.Request, instance=True)
# Credentials should start as invalid
assert not credentials.valid
# before_request should cause a refresh
- credentials.before_request(
- request, 'GET', 'http://example.com?a=1#3', {})
+ credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
# The refresh endpoint should've been called.
assert id_token_jwt_grant.called
diff --git a/tests/test__cloud_sdk.py b/tests/test__cloud_sdk.py
index 58c7270..c5c5c27 100644
--- a/tests/test__cloud_sdk.py
+++ b/tests/test__cloud_sdk.py
@@ -25,29 +25,33 @@
import google.oauth2.credentials
-DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
-AUTHORIZED_USER_FILE = os.path.join(DATA_DIR, 'authorized_user.json')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
+AUTHORIZED_USER_FILE = os.path.join(DATA_DIR, "authorized_user.json")
with io.open(AUTHORIZED_USER_FILE) as fh:
AUTHORIZED_USER_FILE_DATA = json.load(fh)
-SERVICE_ACCOUNT_FILE = os.path.join(DATA_DIR, 'service_account.json')
+SERVICE_ACCOUNT_FILE = os.path.join(DATA_DIR, "service_account.json")
with io.open(SERVICE_ACCOUNT_FILE) as fh:
SERVICE_ACCOUNT_FILE_DATA = json.load(fh)
-with io.open(os.path.join(DATA_DIR, 'cloud_sdk_config.json'), 'rb') as fh:
+with io.open(os.path.join(DATA_DIR, "cloud_sdk_config.json"), "rb") as fh:
CLOUD_SDK_CONFIG_FILE_DATA = fh.read()
[email protected]('data, expected_project_id', [
- (CLOUD_SDK_CONFIG_FILE_DATA, 'example-project'),
- (b'I am some bad json', None),
- (b'{}', None)
-])
[email protected](
+ "data, expected_project_id",
+ [
+ (CLOUD_SDK_CONFIG_FILE_DATA, "example-project"),
+ (b"I am some bad json", None),
+ (b"{}", None),
+ ],
+)
def test_get_project_id(data, expected_project_id):
check_output_patch = mock.patch(
- 'subprocess.check_output', autospec=True, return_value=data)
+ "subprocess.check_output", autospec=True, return_value=data
+ )
with check_output_patch as check_output:
project_id = _cloud_sdk.get_project_id()
@@ -57,100 +61,99 @@
@mock.patch(
- 'subprocess.check_output', autospec=True,
- side_effect=subprocess.CalledProcessError(-1, None))
+ "subprocess.check_output",
+ autospec=True,
+ side_effect=subprocess.CalledProcessError(-1, None),
+)
def test_get_project_id_call_error(check_output):
project_id = _cloud_sdk.get_project_id()
assert project_id is None
assert check_output.called
[email protected]('os.name', new='nt')
[email protected]("os.name", new="nt")
def test_get_project_id_windows():
check_output_patch = mock.patch(
- 'subprocess.check_output', autospec=True,
- return_value=CLOUD_SDK_CONFIG_FILE_DATA)
+ "subprocess.check_output",
+ autospec=True,
+ return_value=CLOUD_SDK_CONFIG_FILE_DATA,
+ )
with check_output_patch as check_output:
project_id = _cloud_sdk.get_project_id()
- assert project_id == 'example-project'
+ assert project_id == "example-project"
assert check_output.called
# Make sure the executable is `gcloud.cmd`.
args = check_output.call_args[0]
command = args[0]
executable = command[0]
- assert executable == 'gcloud.cmd'
+ assert executable == "gcloud.cmd"
[email protected](
- 'google.auth._cloud_sdk.get_config_path', autospec=True)
[email protected]("google.auth._cloud_sdk.get_config_path", autospec=True)
def test_get_application_default_credentials_path(get_config_dir):
- config_path = 'config_path'
+ config_path = "config_path"
get_config_dir.return_value = config_path
credentials_path = _cloud_sdk.get_application_default_credentials_path()
assert credentials_path == os.path.join(
- config_path, _cloud_sdk._CREDENTIALS_FILENAME)
+ config_path, _cloud_sdk._CREDENTIALS_FILENAME
+ )
def test_get_config_path_env_var(monkeypatch):
- config_path_sentinel = 'config_path'
- monkeypatch.setenv(
- environment_vars.CLOUD_SDK_CONFIG_DIR, config_path_sentinel)
+ config_path_sentinel = "config_path"
+ monkeypatch.setenv(environment_vars.CLOUD_SDK_CONFIG_DIR, config_path_sentinel)
config_path = _cloud_sdk.get_config_path()
assert config_path == config_path_sentinel
[email protected]('os.path.expanduser')
[email protected]("os.path.expanduser")
def test_get_config_path_unix(expanduser):
expanduser.side_effect = lambda path: path
config_path = _cloud_sdk.get_config_path()
- assert os.path.split(config_path) == (
- '~/.config', _cloud_sdk._CONFIG_DIRECTORY)
+ assert os.path.split(config_path) == ("~/.config", _cloud_sdk._CONFIG_DIRECTORY)
[email protected]('os.name', new='nt')
[email protected]("os.name", new="nt")
def test_get_config_path_windows(monkeypatch):
- appdata = 'appdata'
+ appdata = "appdata"
monkeypatch.setenv(_cloud_sdk._WINDOWS_CONFIG_ROOT_ENV_VAR, appdata)
config_path = _cloud_sdk.get_config_path()
- assert os.path.split(config_path) == (
- appdata, _cloud_sdk._CONFIG_DIRECTORY)
+ assert os.path.split(config_path) == (appdata, _cloud_sdk._CONFIG_DIRECTORY)
[email protected]('os.name', new='nt')
[email protected]("os.name", new="nt")
def test_get_config_path_no_appdata(monkeypatch):
monkeypatch.delenv(_cloud_sdk._WINDOWS_CONFIG_ROOT_ENV_VAR, raising=False)
- monkeypatch.setenv('SystemDrive', 'G:')
+ monkeypatch.setenv("SystemDrive", "G:")
config_path = _cloud_sdk.get_config_path()
- assert os.path.split(config_path) == (
- 'G:/\\', _cloud_sdk._CONFIG_DIRECTORY)
+ assert os.path.split(config_path) == ("G:/\\", _cloud_sdk._CONFIG_DIRECTORY)
def test_load_authorized_user_credentials():
- credentials = _cloud_sdk.load_authorized_user_credentials(
- AUTHORIZED_USER_FILE_DATA)
+ credentials = _cloud_sdk.load_authorized_user_credentials(AUTHORIZED_USER_FILE_DATA)
assert isinstance(credentials, google.oauth2.credentials.Credentials)
assert credentials.token is None
- assert (credentials._refresh_token ==
- AUTHORIZED_USER_FILE_DATA['refresh_token'])
- assert credentials._client_id == AUTHORIZED_USER_FILE_DATA['client_id']
- assert (credentials._client_secret ==
- AUTHORIZED_USER_FILE_DATA['client_secret'])
- assert (credentials._token_uri ==
- google.oauth2.credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT)
+ assert credentials._refresh_token == AUTHORIZED_USER_FILE_DATA["refresh_token"]
+ assert credentials._client_id == AUTHORIZED_USER_FILE_DATA["client_id"]
+ assert credentials._client_secret == AUTHORIZED_USER_FILE_DATA["client_secret"]
+ assert (
+ credentials._token_uri
+ == google.oauth2.credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT
+ )
def test_load_authorized_user_credentials_bad_format():
with pytest.raises(ValueError) as excinfo:
_cloud_sdk.load_authorized_user_credentials({})
- assert excinfo.match(r'missing fields')
+ assert excinfo.match(r"missing fields")
diff --git a/tests/test__default.py b/tests/test__default.py
index d143479..2c86f3f 100644
--- a/tests/test__default.py
+++ b/tests/test__default.py
@@ -27,94 +27,96 @@
import google.oauth2.credentials
-DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
-AUTHORIZED_USER_FILE = os.path.join(DATA_DIR, 'authorized_user.json')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
+AUTHORIZED_USER_FILE = os.path.join(DATA_DIR, "authorized_user.json")
with open(AUTHORIZED_USER_FILE) as fh:
AUTHORIZED_USER_FILE_DATA = json.load(fh)
AUTHORIZED_USER_CLOUD_SDK_FILE = os.path.join(
- DATA_DIR, 'authorized_user_cloud_sdk.json')
+ DATA_DIR, "authorized_user_cloud_sdk.json"
+)
-SERVICE_ACCOUNT_FILE = os.path.join(DATA_DIR, 'service_account.json')
+SERVICE_ACCOUNT_FILE = os.path.join(DATA_DIR, "service_account.json")
with open(SERVICE_ACCOUNT_FILE) as fh:
SERVICE_ACCOUNT_FILE_DATA = json.load(fh)
LOAD_FILE_PATCH = mock.patch(
- 'google.auth._default._load_credentials_from_file', return_value=(
- mock.sentinel.credentials, mock.sentinel.project_id), autospec=True)
+ "google.auth._default._load_credentials_from_file",
+ return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
+ autospec=True,
+)
def test__load_credentials_from_missing_file():
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
- _default._load_credentials_from_file('')
+ _default._load_credentials_from_file("")
- assert excinfo.match(r'not found')
+ assert excinfo.match(r"not found")
def test__load_credentials_from_file_invalid_json(tmpdir):
- jsonfile = tmpdir.join('invalid.json')
- jsonfile.write('{')
+ jsonfile = tmpdir.join("invalid.json")
+ jsonfile.write("{")
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
_default._load_credentials_from_file(str(jsonfile))
- assert excinfo.match(r'not a valid json file')
+ assert excinfo.match(r"not a valid json file")
def test__load_credentials_from_file_invalid_type(tmpdir):
- jsonfile = tmpdir.join('invalid.json')
- jsonfile.write(json.dumps({'type': 'not-a-real-type'}))
+ jsonfile = tmpdir.join("invalid.json")
+ jsonfile.write(json.dumps({"type": "not-a-real-type"}))
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
_default._load_credentials_from_file(str(jsonfile))
- assert excinfo.match(r'does not have a valid type')
+ assert excinfo.match(r"does not have a valid type")
def test__load_credentials_from_file_authorized_user():
- credentials, project_id = _default._load_credentials_from_file(
- AUTHORIZED_USER_FILE)
+ credentials, project_id = _default._load_credentials_from_file(AUTHORIZED_USER_FILE)
assert isinstance(credentials, google.oauth2.credentials.Credentials)
assert project_id is None
def test__load_credentials_from_file_authorized_user_bad_format(tmpdir):
- filename = tmpdir.join('authorized_user_bad.json')
- filename.write(json.dumps({'type': 'authorized_user'}))
+ filename = tmpdir.join("authorized_user_bad.json")
+ filename.write(json.dumps({"type": "authorized_user"}))
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
_default._load_credentials_from_file(str(filename))
- assert excinfo.match(r'Failed to load authorized user')
- assert excinfo.match(r'missing fields')
+ assert excinfo.match(r"Failed to load authorized user")
+ assert excinfo.match(r"missing fields")
def test__load_credentials_from_file_authorized_user_cloud_sdk():
- with pytest.warns(UserWarning, match='Cloud SDK'):
+ with pytest.warns(UserWarning, match="Cloud SDK"):
credentials, project_id = _default._load_credentials_from_file(
- AUTHORIZED_USER_CLOUD_SDK_FILE)
+ AUTHORIZED_USER_CLOUD_SDK_FILE
+ )
assert isinstance(credentials, google.oauth2.credentials.Credentials)
assert project_id is None
def test__load_credentials_from_file_service_account():
- credentials, project_id = _default._load_credentials_from_file(
- SERVICE_ACCOUNT_FILE)
+ credentials, project_id = _default._load_credentials_from_file(SERVICE_ACCOUNT_FILE)
assert isinstance(credentials, service_account.Credentials)
- assert project_id == SERVICE_ACCOUNT_FILE_DATA['project_id']
+ assert project_id == SERVICE_ACCOUNT_FILE_DATA["project_id"]
def test__load_credentials_from_file_service_account_bad_format(tmpdir):
- filename = tmpdir.join('serivce_account_bad.json')
- filename.write(json.dumps({'type': 'service_account'}))
+ filename = tmpdir.join("serivce_account_bad.json")
+ filename.write(json.dumps({"type": "service_account"}))
with pytest.raises(exceptions.DefaultCredentialsError) as excinfo:
_default._load_credentials_from_file(str(filename))
- assert excinfo.match(r'Failed to load service account')
- assert excinfo.match(r'missing fields')
+ assert excinfo.match(r"Failed to load service account")
+ assert excinfo.match(r"missing fields")
@mock.patch.dict(os.environ, {}, clear=True)
@@ -124,19 +126,19 @@
@LOAD_FILE_PATCH
def test__get_explicit_environ_credentials(load, monkeypatch):
- monkeypatch.setenv(environment_vars.CREDENTIALS, 'filename')
+ monkeypatch.setenv(environment_vars.CREDENTIALS, "filename")
credentials, project_id = _default._get_explicit_environ_credentials()
assert credentials is mock.sentinel.credentials
assert project_id is mock.sentinel.project_id
- load.assert_called_with('filename')
+ load.assert_called_with("filename")
@LOAD_FILE_PATCH
def test__get_explicit_environ_credentials_no_project_id(load, monkeypatch):
load.return_value = mock.sentinel.credentials, None
- monkeypatch.setenv(environment_vars.CREDENTIALS, 'filename')
+ monkeypatch.setenv(environment_vars.CREDENTIALS, "filename")
credentials, project_id = _default._get_explicit_environ_credentials()
@@ -146,8 +148,8 @@
@LOAD_FILE_PATCH
@mock.patch(
- 'google.auth._cloud_sdk.get_application_default_credentials_path',
- autospec=True)
+ "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
+)
def test__get_gcloud_sdk_credentials(get_adc_path, load):
get_adc_path.return_value = SERVICE_ACCOUNT_FILE
@@ -159,10 +161,10 @@
@mock.patch(
- 'google.auth._cloud_sdk.get_application_default_credentials_path',
- autospec=True)
+ "google.auth._cloud_sdk.get_application_default_credentials_path", autospec=True
+)
def test__get_gcloud_sdk_credentials_non_existent(get_adc_path, tmpdir):
- non_existent = tmpdir.join('non-existent')
+ non_existent = tmpdir.join("non-existent")
get_adc_path.return_value = str(non_existent)
credentials, project_id = _default._get_gcloud_sdk_credentials()
@@ -172,12 +174,13 @@
@mock.patch(
- 'google.auth._cloud_sdk.get_project_id',
- return_value=mock.sentinel.project_id, autospec=True)
[email protected]('os.path.isfile', return_value=True, autospec=True)
+ "google.auth._cloud_sdk.get_project_id",
+ return_value=mock.sentinel.project_id,
+ autospec=True,
+)
[email protected]("os.path.isfile", return_value=True, autospec=True)
@LOAD_FILE_PATCH
-def test__get_gcloud_sdk_credentials_project_id(
- load, unused_isfile, get_project_id):
+def test__get_gcloud_sdk_credentials_project_id(load, unused_isfile, get_project_id):
# Don't return a project ID from load file, make the function check
# the Cloud SDK project.
load.return_value = mock.sentinel.credentials, None
@@ -189,13 +192,10 @@
assert get_project_id.called
[email protected](
- 'google.auth._cloud_sdk.get_project_id',
- return_value=None, autospec=True)
[email protected]('os.path.isfile', return_value=True)
[email protected]("google.auth._cloud_sdk.get_project_id", return_value=None, autospec=True)
[email protected]("os.path.isfile", return_value=True)
@LOAD_FILE_PATCH
-def test__get_gcloud_sdk_credentials_no_project_id(
- load, unused_isfile, get_project_id):
+def test__get_gcloud_sdk_credentials_no_project_id(load, unused_isfile, get_project_id):
# Don't return a project ID from load file, make the function check
# the Cloud SDK project.
load.return_value = mock.sentinel.credentials, None
@@ -212,6 +212,7 @@
See https://cloud.google.com/appengine/docs/standard/python/refdocs\
/google.appengine.api.app_identity.app_identity
"""
+
def get_application_id(self):
raise NotImplementedError()
@@ -219,10 +220,8 @@
@pytest.fixture
def app_identity(monkeypatch):
"""Mocks the app_identity module for google.auth.app_engine."""
- app_identity_module = mock.create_autospec(
- _AppIdentityModule, instance=True)
- monkeypatch.setattr(
- app_engine, 'app_identity', app_identity_module)
+ app_identity_module = mock.create_autospec(_AppIdentityModule, instance=True)
+ monkeypatch.setattr(app_engine, "app_identity", app_identity_module)
yield app_identity_module
@@ -237,8 +236,9 @@
def test__get_gae_credentials_no_app_engine():
import sys
- with mock.patch.dict('sys.modules'):
- sys.modules['google.auth.app_engine'] = None
+
+ with mock.patch.dict("sys.modules"):
+ sys.modules["google.auth.app_engine"] = None
credentials, project_id = _default._get_gae_credentials()
assert credentials is None
assert project_id is None
@@ -249,21 +249,23 @@
@mock.patch(
- 'google.auth.compute_engine._metadata.ping', return_value=True,
- autospec=True)
+ "google.auth.compute_engine._metadata.ping", return_value=True, autospec=True
+)
@mock.patch(
- 'google.auth.compute_engine._metadata.get_project_id',
- return_value='example-project', autospec=True)
+ "google.auth.compute_engine._metadata.get_project_id",
+ return_value="example-project",
+ autospec=True,
+)
def test__get_gce_credentials(unused_get, unused_ping):
credentials, project_id = _default._get_gce_credentials()
assert isinstance(credentials, compute_engine.Credentials)
- assert project_id == 'example-project'
+ assert project_id == "example-project"
@mock.patch(
- 'google.auth.compute_engine._metadata.ping', return_value=False,
- autospec=True)
+ "google.auth.compute_engine._metadata.ping", return_value=False, autospec=True
+)
def test__get_gce_credentials_no_ping(unused_ping):
credentials, project_id = _default._get_gce_credentials()
@@ -272,11 +274,13 @@
@mock.patch(
- 'google.auth.compute_engine._metadata.ping', return_value=True,
- autospec=True)
+ "google.auth.compute_engine._metadata.ping", return_value=True, autospec=True
+)
@mock.patch(
- 'google.auth.compute_engine._metadata.get_project_id',
- side_effect=exceptions.TransportError(), autospec=True)
+ "google.auth.compute_engine._metadata.get_project_id",
+ side_effect=exceptions.TransportError(),
+ autospec=True,
+)
def test__get_gce_credentials_no_project_id(unused_get, unused_ping):
credentials, project_id = _default._get_gce_credentials()
@@ -286,110 +290,125 @@
def test__get_gce_credentials_no_compute_engine():
import sys
- with mock.patch.dict('sys.modules'):
- sys.modules['google.auth.compute_engine'] = None
+
+ with mock.patch.dict("sys.modules"):
+ sys.modules["google.auth.compute_engine"] = None
credentials, project_id = _default._get_gce_credentials()
assert credentials is None
assert project_id is None
@mock.patch(
- 'google.auth.compute_engine._metadata.ping', return_value=False,
- autospec=True)
+ "google.auth.compute_engine._metadata.ping", return_value=False, autospec=True
+)
def test__get_gce_credentials_explicit_request(ping):
_default._get_gce_credentials(mock.sentinel.request)
ping.assert_called_with(request=mock.sentinel.request)
@mock.patch(
- 'google.auth._default._get_explicit_environ_credentials',
+ "google.auth._default._get_explicit_environ_credentials",
return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
- autospec=True)
+ autospec=True,
+)
def test_default_early_out(unused_get):
- assert _default.default() == (
- mock.sentinel.credentials, mock.sentinel.project_id)
+ assert _default.default() == (mock.sentinel.credentials, mock.sentinel.project_id)
@mock.patch(
- 'google.auth._default._get_explicit_environ_credentials',
+ "google.auth._default._get_explicit_environ_credentials",
return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
- autospec=True)
+ autospec=True,
+)
def test_default_explict_project_id(unused_get, monkeypatch):
- monkeypatch.setenv(environment_vars.PROJECT, 'explicit-env')
- assert _default.default() == (
- mock.sentinel.credentials, 'explicit-env')
+ monkeypatch.setenv(environment_vars.PROJECT, "explicit-env")
+ assert _default.default() == (mock.sentinel.credentials, "explicit-env")
@mock.patch(
- 'google.auth._default._get_explicit_environ_credentials',
+ "google.auth._default._get_explicit_environ_credentials",
return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
- autospec=True)
+ autospec=True,
+)
def test_default_explict_legacy_project_id(unused_get, monkeypatch):
- monkeypatch.setenv(environment_vars.LEGACY_PROJECT, 'explicit-env')
- assert _default.default() == (
- mock.sentinel.credentials, 'explicit-env')
+ monkeypatch.setenv(environment_vars.LEGACY_PROJECT, "explicit-env")
+ assert _default.default() == (mock.sentinel.credentials, "explicit-env")
[email protected]("logging.Logger.warning", autospec=True)
@mock.patch(
- 'logging.Logger.warning',
- autospec=True)
+ "google.auth._default._get_explicit_environ_credentials",
+ return_value=(mock.sentinel.credentials, None),
+ autospec=True,
+)
@mock.patch(
- 'google.auth._default._get_explicit_environ_credentials',
- return_value=(mock.sentinel.credentials, None), autospec=True)
+ "google.auth._default._get_gcloud_sdk_credentials",
+ return_value=(mock.sentinel.credentials, None),
+ autospec=True,
+)
@mock.patch(
- 'google.auth._default._get_gcloud_sdk_credentials',
- return_value=(mock.sentinel.credentials, None), autospec=True)
+ "google.auth._default._get_gae_credentials",
+ return_value=(mock.sentinel.credentials, None),
+ autospec=True,
+)
@mock.patch(
- 'google.auth._default._get_gae_credentials',
- return_value=(mock.sentinel.credentials, None), autospec=True)
[email protected](
- 'google.auth._default._get_gce_credentials',
- return_value=(mock.sentinel.credentials, None), autospec=True)
+ "google.auth._default._get_gce_credentials",
+ return_value=(mock.sentinel.credentials, None),
+ autospec=True,
+)
def test_default_without_project_id(
- unused_gce, unused_gae, unused_sdk, unused_explicit, logger_warning):
- assert _default.default() == (
- mock.sentinel.credentials, None)
+ unused_gce, unused_gae, unused_sdk, unused_explicit, logger_warning
+):
+ assert _default.default() == (mock.sentinel.credentials, None)
logger_warning.assert_called_with(mock.ANY, mock.ANY, mock.ANY)
@mock.patch(
- 'google.auth._default._get_explicit_environ_credentials',
- return_value=(None, None), autospec=True)
+ "google.auth._default._get_explicit_environ_credentials",
+ return_value=(None, None),
+ autospec=True,
+)
@mock.patch(
- 'google.auth._default._get_gcloud_sdk_credentials',
- return_value=(None, None), autospec=True)
+ "google.auth._default._get_gcloud_sdk_credentials",
+ return_value=(None, None),
+ autospec=True,
+)
@mock.patch(
- 'google.auth._default._get_gae_credentials',
- return_value=(None, None), autospec=True)
+ "google.auth._default._get_gae_credentials",
+ return_value=(None, None),
+ autospec=True,
+)
@mock.patch(
- 'google.auth._default._get_gce_credentials',
- return_value=(None, None), autospec=True)
+ "google.auth._default._get_gce_credentials",
+ return_value=(None, None),
+ autospec=True,
+)
def test_default_fail(unused_gce, unused_gae, unused_sdk, unused_explicit):
with pytest.raises(exceptions.DefaultCredentialsError):
assert _default.default()
@mock.patch(
- 'google.auth._default._get_explicit_environ_credentials',
+ "google.auth._default._get_explicit_environ_credentials",
return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
- autospec=True)
[email protected](
- 'google.auth.credentials.with_scopes_if_required', autospec=True)
+ autospec=True,
+)
[email protected]("google.auth.credentials.with_scopes_if_required", autospec=True)
def test_default_scoped(with_scopes, unused_get):
- scopes = ['one', 'two']
+ scopes = ["one", "two"]
credentials, project_id = _default.default(scopes=scopes)
assert credentials == with_scopes.return_value
assert project_id == mock.sentinel.project_id
- with_scopes.assert_called_once_with(
- mock.sentinel.credentials, scopes)
+ with_scopes.assert_called_once_with(mock.sentinel.credentials, scopes)
@mock.patch(
- 'google.auth._default._get_explicit_environ_credentials',
+ "google.auth._default._get_explicit_environ_credentials",
return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
- autospec=True)
+ autospec=True,
+)
def test_default_no_app_engine_compute_engine_module(unused_get):
"""
google.auth.compute_engine and google.auth.app_engine are both optional
@@ -397,8 +416,11 @@
that default fails gracefully if these modules are absent
"""
import sys
- with mock.patch.dict('sys.modules'):
- sys.modules['google.auth.compute_engine'] = None
- sys.modules['google.auth.app_engine'] = None
+
+ with mock.patch.dict("sys.modules"):
+ sys.modules["google.auth.compute_engine"] = None
+ sys.modules["google.auth.app_engine"] = None
assert _default.default() == (
- mock.sentinel.credentials, mock.sentinel.project_id)
+ mock.sentinel.credentials,
+ mock.sentinel.project_id,
+ )
diff --git a/tests/test__helpers.py b/tests/test__helpers.py
index 79bdef3..3714af7 100644
--- a/tests/test__helpers.py
+++ b/tests/test__helpers.py
@@ -56,20 +56,18 @@
def test_datetime_to_secs():
- assert _helpers.datetime_to_secs(
- datetime.datetime(1970, 1, 1)) == 0
- assert _helpers.datetime_to_secs(
- datetime.datetime(1990, 5, 29)) == 643939200
+ assert _helpers.datetime_to_secs(datetime.datetime(1970, 1, 1)) == 0
+ assert _helpers.datetime_to_secs(datetime.datetime(1990, 5, 29)) == 643939200
def test_to_bytes_with_bytes():
- value = b'bytes-val'
+ value = b"bytes-val"
assert _helpers.to_bytes(value) == value
def test_to_bytes_with_unicode():
- value = u'string-val'
- encoded_value = b'string-val'
+ value = u"string-val"
+ encoded_value = b"string-val"
assert _helpers.to_bytes(value) == encoded_value
@@ -79,13 +77,13 @@
def test_from_bytes_with_unicode():
- value = u'bytes-val'
+ value = u"bytes-val"
assert _helpers.from_bytes(value) == value
def test_from_bytes_with_bytes():
- value = b'string-val'
- decoded_value = u'string-val'
+ value = b"string-val"
+ decoded_value = u"string-val"
assert _helpers.from_bytes(value) == decoded_value
@@ -101,53 +99,49 @@
def test_update_query_params_no_params():
- uri = 'http://www.google.com'
- updated = _helpers.update_query(uri, {'a': 'b'})
- assert updated == uri + '?a=b'
+ uri = "http://www.google.com"
+ updated = _helpers.update_query(uri, {"a": "b"})
+ assert updated == uri + "?a=b"
def test_update_query_existing_params():
- uri = 'http://www.google.com?x=y'
- updated = _helpers.update_query(uri, {'a': 'b', 'c': 'd&'})
- _assert_query(updated, {'x': ['y'], 'a': ['b'], 'c': ['d&']})
+ uri = "http://www.google.com?x=y"
+ updated = _helpers.update_query(uri, {"a": "b", "c": "d&"})
+ _assert_query(updated, {"x": ["y"], "a": ["b"], "c": ["d&"]})
def test_update_query_replace_param():
- base_uri = 'http://www.google.com'
- uri = base_uri + '?x=a'
- updated = _helpers.update_query(uri, {'x': 'b', 'y': 'c'})
- _assert_query(updated, {'x': ['b'], 'y': ['c']})
+ base_uri = "http://www.google.com"
+ uri = base_uri + "?x=a"
+ updated = _helpers.update_query(uri, {"x": "b", "y": "c"})
+ _assert_query(updated, {"x": ["b"], "y": ["c"]})
def test_update_query_remove_param():
- base_uri = 'http://www.google.com'
- uri = base_uri + '?x=a'
- updated = _helpers.update_query(uri, {'y': 'c'}, remove=['x'])
- _assert_query(updated, {'y': ['c']})
+ base_uri = "http://www.google.com"
+ uri = base_uri + "?x=a"
+ updated = _helpers.update_query(uri, {"y": "c"}, remove=["x"])
+ _assert_query(updated, {"y": ["c"]})
def test_scopes_to_string():
cases = [
- ('', ()),
- ('', []),
- ('', ('',)),
- ('', ['', ]),
- ('a', ('a',)),
- ('b', ['b', ]),
- ('a b', ['a', 'b']),
- ('a b', ('a', 'b')),
- ('a b', (s for s in ['a', 'b'])),
+ ("", ()),
+ ("", []),
+ ("", ("",)),
+ ("", [""]),
+ ("a", ("a",)),
+ ("b", ["b"]),
+ ("a b", ["a", "b"]),
+ ("a b", ("a", "b")),
+ ("a b", (s for s in ["a", "b"])),
]
for expected, case in cases:
assert _helpers.scopes_to_string(case) == expected
def test_string_to_scopes():
- cases = [
- ('', []),
- ('a', ['a']),
- ('a b c d e f', ['a', 'b', 'c', 'd', 'e', 'f']),
- ]
+ cases = [("", []), ("a", ["a"]), ("a b c d e f", ["a", "b", "c", "d", "e", "f"])]
for case, expected in cases:
assert _helpers.string_to_scopes(case) == expected
@@ -155,14 +149,14 @@
def test_padded_urlsafe_b64decode():
cases = [
- ('YQ==', b'a'),
- ('YQ', b'a'),
- ('YWE=', b'aa'),
- ('YWE', b'aa'),
- ('YWFhYQ==', b'aaaa'),
- ('YWFhYQ', b'aaaa'),
- ('YWFhYWE=', b'aaaaa'),
- ('YWFhYWE', b'aaaaa'),
+ ("YQ==", b"a"),
+ ("YQ", b"a"),
+ ("YWE=", b"aa"),
+ ("YWE", b"aa"),
+ ("YWFhYQ==", b"aaaa"),
+ ("YWFhYQ", b"aaaa"),
+ ("YWFhYWE=", b"aaaaa"),
+ ("YWFhYWE", b"aaaaa"),
]
for case, expected in cases:
@@ -170,12 +164,7 @@
def test_unpadded_urlsafe_b64encode():
- cases = [
- (b'', b''),
- (b'a', b'YQ'),
- (b'aa', b'YWE'),
- (b'aaa', b'YWFh'),
- ]
+ cases = [(b"", b""), (b"a", b"YQ"), (b"aa", b"YWE"), (b"aaa", b"YWFh")]
for case, expected in cases:
assert _helpers.unpadded_urlsafe_b64encode(case) == expected
diff --git a/tests/test__oauth2client.py b/tests/test__oauth2client.py
index 832b24f..520f943 100644
--- a/tests/test__oauth2client.py
+++ b/tests/test__oauth2client.py
@@ -26,17 +26,23 @@
from google.auth import _oauth2client
-DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
-SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
+SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
def test__convert_oauth2_credentials():
old_credentials = oauth2client.client.OAuth2Credentials(
- 'access_token', 'client_id', 'client_secret', 'refresh_token',
- datetime.datetime.min, 'token_uri', 'user_agent', scopes='one two')
+ "access_token",
+ "client_id",
+ "client_secret",
+ "refresh_token",
+ datetime.datetime.min,
+ "token_uri",
+ "user_agent",
+ scopes="one two",
+ )
- new_credentials = _oauth2client._convert_oauth2_credentials(
- old_credentials)
+ new_credentials = _oauth2client._convert_oauth2_credentials(old_credentials)
assert new_credentials.token == old_credentials.access_token
assert new_credentials._refresh_token == old_credentials.refresh_token
@@ -48,68 +54,74 @@
def test__convert_service_account_credentials():
old_class = oauth2client.service_account.ServiceAccountCredentials
- old_credentials = old_class.from_json_keyfile_name(
- SERVICE_ACCOUNT_JSON_FILE)
+ old_credentials = old_class.from_json_keyfile_name(SERVICE_ACCOUNT_JSON_FILE)
new_credentials = _oauth2client._convert_service_account_credentials(
- old_credentials)
+ old_credentials
+ )
- assert (new_credentials.service_account_email ==
- old_credentials.service_account_email)
+ assert (
+ new_credentials.service_account_email == old_credentials.service_account_email
+ )
assert new_credentials._signer.key_id == old_credentials._private_key_id
assert new_credentials._token_uri == old_credentials.token_uri
def test__convert_service_account_credentials_with_jwt():
old_class = oauth2client.service_account._JWTAccessCredentials
- old_credentials = old_class.from_json_keyfile_name(
- SERVICE_ACCOUNT_JSON_FILE)
+ old_credentials = old_class.from_json_keyfile_name(SERVICE_ACCOUNT_JSON_FILE)
new_credentials = _oauth2client._convert_service_account_credentials(
- old_credentials)
+ old_credentials
+ )
- assert (new_credentials.service_account_email ==
- old_credentials.service_account_email)
+ assert (
+ new_credentials.service_account_email == old_credentials.service_account_email
+ )
assert new_credentials._signer.key_id == old_credentials._private_key_id
assert new_credentials._token_uri == old_credentials.token_uri
def test__convert_gce_app_assertion_credentials():
old_credentials = oauth2client.contrib.gce.AppAssertionCredentials(
- email='some_email')
+ email="some_email"
+ )
new_credentials = _oauth2client._convert_gce_app_assertion_credentials(
- old_credentials)
+ old_credentials
+ )
- assert (new_credentials.service_account_email ==
- old_credentials.service_account_email)
+ assert (
+ new_credentials.service_account_email == old_credentials.service_account_email
+ )
@pytest.fixture
def mock_oauth2client_gae_imports(mock_non_existent_module):
- mock_non_existent_module('google.appengine.api.app_identity')
- mock_non_existent_module('google.appengine.ext.ndb')
- mock_non_existent_module('google.appengine.ext.webapp.util')
- mock_non_existent_module('webapp2')
+ mock_non_existent_module("google.appengine.api.app_identity")
+ mock_non_existent_module("google.appengine.ext.ndb")
+ mock_non_existent_module("google.appengine.ext.webapp.util")
+ mock_non_existent_module("webapp2")
[email protected]('google.auth.app_engine.app_identity')
[email protected]("google.auth.app_engine.app_identity")
def test__convert_appengine_app_assertion_credentials(
- app_identity, mock_oauth2client_gae_imports):
+ app_identity, mock_oauth2client_gae_imports
+):
import oauth2client.contrib.appengine
- service_account_id = 'service_account_id'
+ service_account_id = "service_account_id"
old_credentials = oauth2client.contrib.appengine.AppAssertionCredentials(
- scope='one two', service_account_id=service_account_id)
+ scope="one two", service_account_id=service_account_id
+ )
- new_credentials = (
- _oauth2client._convert_appengine_app_assertion_credentials(
- old_credentials))
+ new_credentials = _oauth2client._convert_appengine_app_assertion_credentials(
+ old_credentials
+ )
- assert new_credentials.scopes == ['one', 'two']
- assert (new_credentials._service_account_id ==
- old_credentials.service_account_id)
+ assert new_credentials.scopes == ["one", "two"]
+ assert new_credentials._service_account_id == old_credentials.service_account_id
class FakeCredentials(object):
@@ -117,10 +129,10 @@
def test_convert_success():
- convert_function = mock.Mock(spec=['__call__'])
+ convert_function = mock.Mock(spec=["__call__"])
conversion_map_patch = mock.patch.object(
- _oauth2client, '_CLASS_CONVERSION_MAP',
- {FakeCredentials: convert_function})
+ _oauth2client, "_CLASS_CONVERSION_MAP", {FakeCredentials: convert_function}
+ )
credentials = FakeCredentials()
with conversion_map_patch:
@@ -132,9 +144,9 @@
def test_convert_not_found():
with pytest.raises(ValueError) as excinfo:
- _oauth2client.convert('a string is not a real credentials class')
+ _oauth2client.convert("a string is not a real credentials class")
- assert excinfo.match('Unable to convert')
+ assert excinfo.match("Unable to convert")
@pytest.fixture
@@ -144,14 +156,15 @@
def test_import_has_app_engine(
- mock_oauth2client_gae_imports, reset__oauth2client_module):
+ mock_oauth2client_gae_imports, reset__oauth2client_module
+):
reload_module(_oauth2client)
assert _oauth2client._HAS_APPENGINE
def test_import_without_oauth2client(monkeypatch, reset__oauth2client_module):
- monkeypatch.setitem(sys.modules, 'oauth2client', None)
+ monkeypatch.setitem(sys.modules, "oauth2client", None)
with pytest.raises(ImportError) as excinfo:
reload_module(_oauth2client)
- assert excinfo.match('oauth2client')
+ assert excinfo.match("oauth2client")
diff --git a/tests/test__service_account_info.py b/tests/test__service_account_info.py
index ef41e27..4419f67 100644
--- a/tests/test__service_account_info.py
+++ b/tests/test__service_account_info.py
@@ -22,42 +22,41 @@
from google.auth import crypt
-DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
-SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
+SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
-with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
+with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh:
SERVICE_ACCOUNT_INFO = json.load(fh)
def test_from_dict():
signer = _service_account_info.from_dict(SERVICE_ACCOUNT_INFO)
assert isinstance(signer, crypt.RSASigner)
- assert signer.key_id == SERVICE_ACCOUNT_INFO['private_key_id']
+ assert signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"]
def test_from_dict_bad_private_key():
info = SERVICE_ACCOUNT_INFO.copy()
- info['private_key'] = 'garbage'
+ info["private_key"] = "garbage"
with pytest.raises(ValueError) as excinfo:
_service_account_info.from_dict(info)
- assert excinfo.match(r'key')
+ assert excinfo.match(r"key")
def test_from_dict_bad_format():
with pytest.raises(ValueError) as excinfo:
- _service_account_info.from_dict({}, require=('meep',))
+ _service_account_info.from_dict({}, require=("meep",))
- assert excinfo.match(r'missing fields')
+ assert excinfo.match(r"missing fields")
def test_from_filename():
- info, signer = _service_account_info.from_filename(
- SERVICE_ACCOUNT_JSON_FILE)
+ info, signer = _service_account_info.from_filename(SERVICE_ACCOUNT_JSON_FILE)
for key, value in six.iteritems(SERVICE_ACCOUNT_INFO):
assert info[key] == value
assert isinstance(signer, crypt.RSASigner)
- assert signer.key_id == SERVICE_ACCOUNT_INFO['private_key_id']
+ assert signer.key_id == SERVICE_ACCOUNT_INFO["private_key_id"]
diff --git a/tests/test_app_engine.py b/tests/test_app_engine.py
index 70fa5ca..9559a2c 100644
--- a/tests/test_app_engine.py
+++ b/tests/test_app_engine.py
@@ -25,6 +25,7 @@
See https://cloud.google.com/appengine/docs/standard/python/refdocs
/google.appengine.api.app_identity.app_identity
"""
+
def get_application_id(self):
raise NotImplementedError()
@@ -41,10 +42,8 @@
@pytest.fixture
def app_identity(monkeypatch):
"""Mocks the app_identity module for google.auth.app_engine."""
- app_identity_module = mock.create_autospec(
- _AppIdentityModule, instance=True)
- monkeypatch.setattr(
- app_engine, 'app_identity', app_identity_module)
+ app_identity_module = mock.create_autospec(_AppIdentityModule, instance=True)
+ monkeypatch.setattr(app_engine, "app_identity", app_identity_module)
yield app_identity_module
@@ -57,13 +56,15 @@
with pytest.raises(EnvironmentError) as excinfo:
assert app_engine.get_project_id()
- assert excinfo.match(r'App Engine APIs are not available')
+ assert excinfo.match(r"App Engine APIs are not available")
class TestSigner(object):
def test_key_id(self, app_identity):
app_identity.sign_blob.return_value = (
- mock.sentinel.key_id, mock.sentinel.signature)
+ mock.sentinel.key_id,
+ mock.sentinel.signature,
+ )
signer = app_engine.Signer()
@@ -71,10 +72,12 @@
def test_sign(self, app_identity):
app_identity.sign_blob.return_value = (
- mock.sentinel.key_id, mock.sentinel.signature)
+ mock.sentinel.key_id,
+ mock.sentinel.signature,
+ )
signer = app_engine.Signer()
- to_sign = b'123'
+ to_sign = b"123"
signature = signer.sign(to_sign)
@@ -87,7 +90,7 @@
with pytest.raises(EnvironmentError) as excinfo:
app_engine.Credentials()
- assert excinfo.match(r'App Engine APIs are not available')
+ assert excinfo.match(r"App Engine APIs are not available")
def test_default_state(self, app_identity):
credentials = app_engine.Credentials()
@@ -106,52 +109,52 @@
assert not credentials.scopes
assert credentials.requires_scopes
- scoped_credentials = credentials.with_scopes(['email'])
+ scoped_credentials = credentials.with_scopes(["email"])
- assert scoped_credentials.has_scopes(['email'])
+ assert scoped_credentials.has_scopes(["email"])
assert not scoped_credentials.requires_scopes
def test_service_account_email_implicit(self, app_identity):
app_identity.get_service_account_name.return_value = (
- mock.sentinel.service_account_email)
+ mock.sentinel.service_account_email
+ )
credentials = app_engine.Credentials()
- assert (credentials.service_account_email ==
- mock.sentinel.service_account_email)
+ assert credentials.service_account_email == mock.sentinel.service_account_email
assert app_identity.get_service_account_name.called
def test_service_account_email_explicit(self, app_identity):
credentials = app_engine.Credentials(
- service_account_id=mock.sentinel.service_account_email)
+ service_account_id=mock.sentinel.service_account_email
+ )
- assert (credentials.service_account_email ==
- mock.sentinel.service_account_email)
+ assert credentials.service_account_email == mock.sentinel.service_account_email
assert not app_identity.get_service_account_name.called
- @mock.patch(
- 'google.auth._helpers.utcnow',
- return_value=datetime.datetime.min)
+ @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
def test_refresh(self, utcnow, app_identity):
- token = 'token'
+ token = "token"
ttl = 643942923
app_identity.get_access_token.return_value = token, ttl
- credentials = app_engine.Credentials(scopes=['email'])
+ credentials = app_engine.Credentials(scopes=["email"])
credentials.refresh(None)
app_identity.get_access_token.assert_called_with(
- credentials.scopes, credentials._service_account_id)
+ credentials.scopes, credentials._service_account_id
+ )
assert credentials.token == token
- assert credentials.expiry == datetime.datetime(
- 1990, 5, 29, 1, 2, 3)
+ assert credentials.expiry == datetime.datetime(1990, 5, 29, 1, 2, 3)
assert credentials.valid
assert not credentials.expired
def test_sign_bytes(self, app_identity):
app_identity.sign_blob.return_value = (
- mock.sentinel.key_id, mock.sentinel.signature)
+ mock.sentinel.key_id,
+ mock.sentinel.signature,
+ )
credentials = app_engine.Credentials()
- to_sign = b'123'
+ to_sign = b"123"
signature = credentials.sign_bytes(to_sign)
diff --git a/tests/test_credentials.py b/tests/test_credentials.py
index b302989..2a89b01 100644
--- a/tests/test_credentials.py
+++ b/tests/test_credentials.py
@@ -35,7 +35,7 @@
def test_expired_and_valid():
credentials = CredentialsImpl()
- credentials.token = 'token'
+ credentials.token = "token"
assert credentials.valid
assert not credentials.expired
@@ -43,9 +43,8 @@
# Set the expiration to one second more than now plus the clock skew
# accomodation. These credentials should be valid.
credentials.expiry = (
- datetime.datetime.utcnow() +
- _helpers.CLOCK_SKEW +
- datetime.timedelta(seconds=1))
+ datetime.datetime.utcnow() + _helpers.CLOCK_SKEW + datetime.timedelta(seconds=1)
+ )
assert credentials.valid
assert not credentials.expired
@@ -60,23 +59,23 @@
def test_before_request():
credentials = CredentialsImpl()
- request = 'token'
+ request = "token"
headers = {}
# First call should call refresh, setting the token.
- credentials.before_request(request, 'http://example.com', 'GET', headers)
+ credentials.before_request(request, "http://example.com", "GET", headers)
assert credentials.valid
- assert credentials.token == 'token'
- assert headers['authorization'] == 'Bearer token'
+ assert credentials.token == "token"
+ assert headers["authorization"] == "Bearer token"
- request = 'token2'
+ request = "token2"
headers = {}
# Second call shouldn't call refresh.
- credentials.before_request(request, 'http://example.com', 'GET', headers)
+ credentials.before_request(request, "http://example.com", "GET", headers)
assert credentials.valid
- assert credentials.token == 'token'
- assert headers['authorization'] == 'Bearer token'
+ assert credentials.token == "token"
+ assert headers["authorization"] == "Bearer token"
def test_anonymous_credentials_ctor():
@@ -100,21 +99,20 @@
anon.apply(headers)
assert headers == {}
with pytest.raises(ValueError):
- anon.apply(headers, token='TOKEN')
+ anon.apply(headers, token="TOKEN")
def test_anonymous_credentials_before_request():
anon = credentials.AnonymousCredentials()
request = object()
- method = 'GET'
- url = 'https://example.com/api/endpoint'
+ method = "GET"
+ url = "https://example.com/api/endpoint"
headers = {}
anon.before_request(request, method, url, headers)
assert headers == {}
-class ReadOnlyScopedCredentialsImpl(
- credentials.ReadOnlyScoped, CredentialsImpl):
+class ReadOnlyScopedCredentialsImpl(credentials.ReadOnlyScoped, CredentialsImpl):
@property
def requires_scopes(self):
return super(ReadOnlyScopedCredentialsImpl, self).requires_scopes
@@ -127,12 +125,12 @@
def test_readonly_scoped_credentials_scopes():
credentials = ReadOnlyScopedCredentialsImpl()
- credentials._scopes = ['one', 'two']
- assert credentials.scopes == ['one', 'two']
- assert credentials.has_scopes(['one'])
- assert credentials.has_scopes(['two'])
- assert credentials.has_scopes(['one', 'two'])
- assert not credentials.has_scopes(['three'])
+ credentials._scopes = ["one", "two"]
+ assert credentials.scopes == ["one", "two"]
+ assert credentials.has_scopes(["one"])
+ assert credentials.has_scopes(["two"])
+ assert credentials.has_scopes(["one", "two"])
+ assert not credentials.has_scopes(["three"])
def test_readonly_scoped_credentials_requires_scopes():
@@ -156,16 +154,18 @@
def test_create_scoped_if_required_scoped():
unscoped_credentials = RequiresScopedCredentialsImpl()
scoped_credentials = credentials.with_scopes_if_required(
- unscoped_credentials, ['one', 'two'])
+ unscoped_credentials, ["one", "two"]
+ )
assert scoped_credentials is not unscoped_credentials
assert not scoped_credentials.requires_scopes
- assert scoped_credentials.has_scopes(['one', 'two'])
+ assert scoped_credentials.has_scopes(["one", "two"])
def test_create_scoped_if_required_not_scopes():
unscoped_credentials = CredentialsImpl()
scoped_credentials = credentials.with_scopes_if_required(
- unscoped_credentials, ['one', 'two'])
+ unscoped_credentials, ["one", "two"]
+ )
assert scoped_credentials is unscoped_credentials
diff --git a/tests/test_iam.py b/tests/test_iam.py
index cc09085..52ab9bd 100644
--- a/tests/test_iam.py
+++ b/tests/test_iam.py
@@ -32,7 +32,7 @@
response.status = status
if data is not None:
- response.data = json.dumps(data).encode('utf-8')
+ response.data = json.dumps(data).encode("utf-8")
request = mock.create_autospec(transport.Request)
request.return_value = response
@@ -43,7 +43,7 @@
class CredentialsImpl(google.auth.credentials.Credentials):
def __init__(self):
super(CredentialsImpl, self).__init__()
- self.token = 'token'
+ self.token = "token"
# Force refresh
self.expiry = datetime.datetime.min + _helpers.CLOCK_SKEW
@@ -57,35 +57,33 @@
def test_constructor(self):
request = mock.sentinel.request
credentials = mock.create_autospec(
- google.auth.credentials.Credentials, instance=True)
+ google.auth.credentials.Credentials, instance=True
+ )
- signer = iam.Signer(
- request, credentials, mock.sentinel.service_account_email)
+ signer = iam.Signer(request, credentials, mock.sentinel.service_account_email)
assert signer._request == mock.sentinel.request
assert signer._credentials == credentials
- assert (signer._service_account_email ==
- mock.sentinel.service_account_email)
+ assert signer._service_account_email == mock.sentinel.service_account_email
def test_key_id(self):
signer = iam.Signer(
mock.sentinel.request,
mock.sentinel.credentials,
- mock.sentinel.service_account_email)
+ mock.sentinel.service_account_email,
+ )
assert signer.key_id is None
def test_sign_bytes(self):
- signature = b'DEADBEEF'
- encoded_signature = base64.b64encode(signature).decode('utf-8')
- request = make_request(
- http_client.OK, data={'signature': encoded_signature})
+ signature = b"DEADBEEF"
+ encoded_signature = base64.b64encode(signature).decode("utf-8")
+ request = make_request(http_client.OK, data={"signature": encoded_signature})
credentials = make_credentials()
- signer = iam.Signer(
- request, credentials, mock.sentinel.service_account_email)
+ signer = iam.Signer(request, credentials, mock.sentinel.service_account_email)
- returned_signature = signer.sign('123')
+ returned_signature = signer.sign("123")
assert returned_signature == signature
@@ -93,8 +91,7 @@
request = make_request(http_client.UNAUTHORIZED)
credentials = make_credentials()
- signer = iam.Signer(
- request, credentials, mock.sentinel.service_account_email)
+ signer = iam.Signer(request, credentials, mock.sentinel.service_account_email)
with pytest.raises(exceptions.TransportError):
- signer.sign('123')
+ signer.sign("123")
diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py
index 9945401..1cfcc7c 100644
--- a/tests/test_impersonated_credentials.py
+++ b/tests/test_impersonated_credentials.py
@@ -28,35 +28,38 @@
from google.auth.impersonated_credentials import Credentials
from google.oauth2 import service_account
-DATA_DIR = os.path.join(os.path.dirname(__file__), '', 'data')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "", "data")
-with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
-SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
+SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
-ID_TOKEN_DATA = ('eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew'
- 'Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc'
- 'zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle'
- 'HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L'
- 'y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN'
- 'zA4NTY4In0.redacted')
+ID_TOKEN_DATA = (
+ "eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew"
+ "Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc"
+ "zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle"
+ "HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L"
+ "y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN"
+ "zA4NTY4In0.redacted"
+)
ID_TOKEN_EXPIRY = 1564475051
-with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
+with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh:
SERVICE_ACCOUNT_INFO = json.load(fh)
-SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
-TOKEN_URI = 'https://example.com/oauth2/token'
+SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
+TOKEN_URI = "https://example.com/oauth2/token"
@pytest.fixture
def mock_donor_credentials():
- with mock.patch('google.oauth2._client.jwt_grant', autospec=True) as grant:
+ with mock.patch("google.oauth2._client.jwt_grant", autospec=True) as grant:
grant.return_value = (
"source token",
_helpers.utcnow() + datetime.timedelta(seconds=500),
- {})
+ {},
+ )
yield grant
@@ -71,54 +74,51 @@
@pytest.fixture
def mock_authorizedsession_sign():
- with mock.patch('google.auth.transport.requests.AuthorizedSession.request',
- autospec=True) as auth_session:
- data = {
- "keyId": "1",
- "signedBlob": "c2lnbmF0dXJl"
- }
+ with mock.patch(
+ "google.auth.transport.requests.AuthorizedSession.request", autospec=True
+ ) as auth_session:
+ data = {"keyId": "1", "signedBlob": "c2lnbmF0dXJl"}
auth_session.return_value = MockResponse(data, http_client.OK)
yield auth_session
@pytest.fixture
def mock_authorizedsession_idtoken():
- with mock.patch('google.auth.transport.requests.AuthorizedSession.request',
- autospec=True) as auth_session:
- data = {
- "token": ID_TOKEN_DATA
- }
+ with mock.patch(
+ "google.auth.transport.requests.AuthorizedSession.request", autospec=True
+ ) as auth_session:
+ data = {"token": ID_TOKEN_DATA}
auth_session.return_value = MockResponse(data, http_client.OK)
yield auth_session
class TestImpersonatedCredentials(object):
- SERVICE_ACCOUNT_EMAIL = '[email protected]'
- TARGET_PRINCIPAL = '[email protected]'
- TARGET_SCOPES = ['https://www.googleapis.com/auth/devstorage.read_only']
+ SERVICE_ACCOUNT_EMAIL = "[email protected]"
+ TARGET_PRINCIPAL = "[email protected]"
+ TARGET_SCOPES = ["https://www.googleapis.com/auth/devstorage.read_only"]
DELEGATES = []
LIFETIME = 3600
SOURCE_CREDENTIALS = service_account.Credentials(
- SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI)
+ SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI
+ )
- def make_credentials(self, lifetime=LIFETIME,
- target_principal=TARGET_PRINCIPAL):
+ def make_credentials(self, lifetime=LIFETIME, target_principal=TARGET_PRINCIPAL):
return Credentials(
source_credentials=self.SOURCE_CREDENTIALS,
target_principal=target_principal,
target_scopes=self.TARGET_SCOPES,
delegates=self.DELEGATES,
- lifetime=lifetime)
+ lifetime=lifetime,
+ )
def test_default_state(self):
credentials = self.make_credentials()
assert not credentials.valid
assert credentials.expired
- def make_request(self, data, status=http_client.OK,
- headers=None, side_effect=None):
+ def make_request(self, data, status=http_client.OK, headers=None, side_effect=None):
response = mock.create_autospec(transport.Response, instance=False)
response.status = status
response.data = _helpers.to_bytes(data)
@@ -132,40 +132,34 @@
def test_refresh_success(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)
- token = 'token'
+ token = "token"
expire_time = (
- _helpers.utcnow().replace(microsecond=0) +
- datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
- response_body = {
- "accessToken": token,
- "expireTime": expire_time
- }
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.OK)
+ data=json.dumps(response_body), status=http_client.OK
+ )
credentials.refresh(request)
assert credentials.valid
assert not credentials.expired
- def test_refresh_failure_malformed_expire_time(
- self, mock_donor_credentials):
+ def test_refresh_failure_malformed_expire_time(self, mock_donor_credentials):
credentials = self.make_credentials(lifetime=None)
- token = 'token'
+ token = "token"
- expire_time = (
- _helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat('T')
- response_body = {
- "accessToken": token,
- "expireTime": expire_time
- }
+ expire_time = (_helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat(
+ "T"
+ )
+ response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.OK)
+ data=json.dumps(response_body), status=http_client.OK
+ )
with pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(request)
@@ -180,15 +174,15 @@
response_body = {
"error": {
- "code": 403,
- "message": "The caller does not have permission",
- "status": "PERMISSION_DENIED"
+ "code": 403,
+ "message": "The caller does not have permission",
+ "status": "PERMISSION_DENIED",
}
}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.UNAUTHORIZED)
+ data=json.dumps(response_body), status=http_client.UNAUTHORIZED
+ )
with pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(request)
@@ -204,8 +198,8 @@
response_body = {}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.HTTPException)
+ data=json.dumps(response_body), status=http_client.HTTPException
+ )
with pytest.raises(exceptions.RefreshError) as excinfo:
credentials.refresh(request)
@@ -221,31 +215,24 @@
def test_signer(self):
credentials = self.make_credentials()
- assert isinstance(credentials.signer,
- impersonated_credentials.Credentials)
+ assert isinstance(credentials.signer, impersonated_credentials.Credentials)
def test_signer_email(self):
- credentials = self.make_credentials(
- target_principal=self.TARGET_PRINCIPAL)
+ credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL)
assert credentials.signer_email == self.TARGET_PRINCIPAL
def test_service_account_email(self):
- credentials = self.make_credentials(
- target_principal=self.TARGET_PRINCIPAL)
+ credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL)
assert credentials.service_account_email == self.TARGET_PRINCIPAL
- def test_sign_bytes(self, mock_donor_credentials,
- mock_authorizedsession_sign):
+ def test_sign_bytes(self, mock_donor_credentials, mock_authorizedsession_sign):
credentials = self.make_credentials(lifetime=None)
- token = 'token'
+ token = "token"
expire_time = (
- _helpers.utcnow().replace(microsecond=0) +
- datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
- token_response_body = {
- "accessToken": token,
- "expireTime": expire_time
- }
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ token_response_body = {"accessToken": token, "expireTime": expire_time}
response = mock.create_autospec(transport.Response, instance=False)
response.status = http_client.OK
@@ -259,26 +246,24 @@
assert credentials.valid
assert not credentials.expired
- signature = credentials.sign_bytes(b'signed bytes')
- assert signature == b'signature'
+ signature = credentials.sign_bytes(b"signed bytes")
+ assert signature == b"signature"
- def test_id_token_success(self, mock_donor_credentials,
- mock_authorizedsession_idtoken):
+ def test_id_token_success(
+ self, mock_donor_credentials, mock_authorizedsession_idtoken
+ ):
credentials = self.make_credentials(lifetime=None)
- token = 'token'
- target_audience = 'https://foo.bar'
+ token = "token"
+ target_audience = "https://foo.bar"
expire_time = (
- _helpers.utcnow().replace(microsecond=0) +
- datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
- response_body = {
- "accessToken": token,
- "expireTime": expire_time
- }
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.OK)
+ data=json.dumps(response_body), status=http_client.OK
+ )
credentials.refresh(request)
@@ -286,30 +271,28 @@
assert not credentials.expired
id_creds = impersonated_credentials.IDTokenCredentials(
- credentials, target_audience=target_audience)
+ credentials, target_audience=target_audience
+ )
id_creds.refresh(request)
assert id_creds.token == ID_TOKEN_DATA
- assert id_creds.expiry == datetime.datetime.fromtimestamp(
- ID_TOKEN_EXPIRY)
+ assert id_creds.expiry == datetime.datetime.fromtimestamp(ID_TOKEN_EXPIRY)
- def test_id_token_from_credential(self, mock_donor_credentials,
- mock_authorizedsession_idtoken):
+ def test_id_token_from_credential(
+ self, mock_donor_credentials, mock_authorizedsession_idtoken
+ ):
credentials = self.make_credentials(lifetime=None)
- token = 'token'
- target_audience = 'https://foo.bar'
+ token = "token"
+ target_audience = "https://foo.bar"
expire_time = (
- _helpers.utcnow().replace(microsecond=0) +
- datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
- response_body = {
- "accessToken": token,
- "expireTime": expire_time
- }
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.OK)
+ data=json.dumps(response_body), status=http_client.OK
+ )
credentials.refresh(request)
@@ -317,72 +300,66 @@
assert not credentials.expired
id_creds = impersonated_credentials.IDTokenCredentials(
- credentials, target_audience=target_audience)
+ credentials, target_audience=target_audience
+ )
id_creds = id_creds.from_credentials(target_credentials=credentials)
id_creds.refresh(request)
assert id_creds.token == ID_TOKEN_DATA
- def test_id_token_with_target_audience(self, mock_donor_credentials,
- mock_authorizedsession_idtoken):
+ def test_id_token_with_target_audience(
+ self, mock_donor_credentials, mock_authorizedsession_idtoken
+ ):
credentials = self.make_credentials(lifetime=None)
- token = 'token'
- target_audience = 'https://foo.bar'
+ token = "token"
+ target_audience = "https://foo.bar"
expire_time = (
- _helpers.utcnow().replace(microsecond=0) +
- datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
- response_body = {
- "accessToken": token,
- "expireTime": expire_time
- }
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.OK)
+ data=json.dumps(response_body), status=http_client.OK
+ )
credentials.refresh(request)
assert credentials.valid
assert not credentials.expired
- id_creds = impersonated_credentials.IDTokenCredentials(
- credentials)
- id_creds = id_creds.with_target_audience(
- target_audience=target_audience)
+ id_creds = impersonated_credentials.IDTokenCredentials(credentials)
+ id_creds = id_creds.with_target_audience(target_audience=target_audience)
id_creds.refresh(request)
assert id_creds.token == ID_TOKEN_DATA
- assert id_creds.expiry == datetime.datetime.fromtimestamp(
- ID_TOKEN_EXPIRY)
+ assert id_creds.expiry == datetime.datetime.fromtimestamp(ID_TOKEN_EXPIRY)
- def test_id_token_invalid_cred(self, mock_donor_credentials,
- mock_authorizedsession_idtoken):
+ def test_id_token_invalid_cred(
+ self, mock_donor_credentials, mock_authorizedsession_idtoken
+ ):
credentials = None
with pytest.raises(exceptions.GoogleAuthError) as excinfo:
impersonated_credentials.IDTokenCredentials(credentials)
- assert excinfo.match('Provided Credential must be'
- ' impersonated_credentials')
+ assert excinfo.match("Provided Credential must be" " impersonated_credentials")
- def test_id_token_with_include_email(self, mock_donor_credentials,
- mock_authorizedsession_idtoken):
+ def test_id_token_with_include_email(
+ self, mock_donor_credentials, mock_authorizedsession_idtoken
+ ):
credentials = self.make_credentials(lifetime=None)
- token = 'token'
- target_audience = 'https://foo.bar'
+ token = "token"
+ target_audience = "https://foo.bar"
expire_time = (
- _helpers.utcnow().replace(microsecond=0) +
- datetime.timedelta(seconds=500)).isoformat('T') + 'Z'
- response_body = {
- "accessToken": token,
- "expireTime": expire_time
- }
+ _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+ ).isoformat("T") + "Z"
+ response_body = {"accessToken": token, "expireTime": expire_time}
request = self.make_request(
- data=json.dumps(response_body),
- status=http_client.OK)
+ data=json.dumps(response_body), status=http_client.OK
+ )
credentials.refresh(request)
@@ -390,7 +367,8 @@
assert not credentials.expired
id_creds = impersonated_credentials.IDTokenCredentials(
- credentials, target_audience=target_audience)
+ credentials, target_audience=target_audience
+ )
id_creds = id_creds.with_include_email(True)
id_creds.refresh(request)
diff --git a/tests/test_jwt.py b/tests/test_jwt.py
index 4a64717..b0c6e48 100644
--- a/tests/test_jwt.py
+++ b/tests/test_jwt.py
@@ -26,41 +26,45 @@
from google.auth import jwt
-DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
+DATA_DIR = os.path.join(os.path.dirname(__file__), "data")
-with open(os.path.join(DATA_DIR, 'privatekey.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh:
PRIVATE_KEY_BYTES = fh.read()
-with open(os.path.join(DATA_DIR, 'public_cert.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "public_cert.pem"), "rb") as fh:
PUBLIC_CERT_BYTES = fh.read()
-with open(os.path.join(DATA_DIR, 'other_cert.pem'), 'rb') as fh:
+with open(os.path.join(DATA_DIR, "other_cert.pem"), "rb") as fh:
OTHER_CERT_BYTES = fh.read()
-SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, 'service_account.json')
+SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json")
-with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
+with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh:
SERVICE_ACCOUNT_INFO = json.load(fh)
@pytest.fixture
def signer():
- return crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, '1')
+ return crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1")
def test_encode_basic(signer):
- test_payload = {'test': 'value'}
+ test_payload = {"test": "value"}
encoded = jwt.encode(signer, test_payload)
header, payload, _, _ = jwt._unverified_decode(encoded)
assert payload == test_payload
- assert header == {'typ': 'JWT', 'alg': 'RS256', 'kid': signer.key_id}
+ assert header == {"typ": "JWT", "alg": "RS256", "kid": signer.key_id}
def test_encode_extra_headers(signer):
- encoded = jwt.encode(signer, {}, header={'extra': 'value'})
+ encoded = jwt.encode(signer, {}, header={"extra": "value"})
header = jwt.decode_header(encoded)
assert header == {
- 'typ': 'JWT', 'alg': 'RS256', 'kid': signer.key_id, 'extra': 'value'}
+ "typ": "JWT",
+ "alg": "RS256",
+ "kid": signer.key_id,
+ "extra": "value",
+ }
@pytest.fixture
@@ -68,11 +72,11 @@
def factory(claims=None, key_id=None):
now = _helpers.datetime_to_secs(_helpers.utcnow())
payload = {
- 'aud': '[email protected]',
- 'iat': now,
- 'exp': now + 300,
- 'user': 'billy bob',
- 'metadata': {'meta': 'data'}
+ "aud": "[email protected]",
+ "iat": now,
+ "exp": now + 300,
+ "user": "billy bob",
+ "metadata": {"meta": "data"},
}
payload.update(claims or {})
@@ -83,154 +87,168 @@
key_id = None
return jwt.encode(signer, payload, key_id=key_id)
+
return factory
def test_decode_valid(token_factory):
payload = jwt.decode(token_factory(), certs=PUBLIC_CERT_BYTES)
- assert payload['aud'] == '[email protected]'
- assert payload['user'] == 'billy bob'
- assert payload['metadata']['meta'] == 'data'
+ assert payload["aud"] == "[email protected]"
+ assert payload["user"] == "billy bob"
+ assert payload["metadata"]["meta"] == "data"
def test_decode_valid_with_audience(token_factory):
payload = jwt.decode(
- token_factory(), certs=PUBLIC_CERT_BYTES,
- audience='[email protected]')
- assert payload['aud'] == '[email protected]'
- assert payload['user'] == 'billy bob'
- assert payload['metadata']['meta'] == 'data'
+ token_factory(), certs=PUBLIC_CERT_BYTES, audience="[email protected]"
+ )
+ assert payload["aud"] == "[email protected]"
+ assert payload["user"] == "billy bob"
+ assert payload["metadata"]["meta"] == "data"
def test_decode_valid_unverified(token_factory):
payload = jwt.decode(token_factory(), certs=OTHER_CERT_BYTES, verify=False)
- assert payload['aud'] == '[email protected]'
- assert payload['user'] == 'billy bob'
- assert payload['metadata']['meta'] == 'data'
+ assert payload["aud"] == "[email protected]"
+ assert payload["user"] == "billy bob"
+ assert payload["metadata"]["meta"] == "data"
def test_decode_bad_token_wrong_number_of_segments():
with pytest.raises(ValueError) as excinfo:
- jwt.decode('1.2', PUBLIC_CERT_BYTES)
- assert excinfo.match(r'Wrong number of segments')
+ jwt.decode("1.2", PUBLIC_CERT_BYTES)
+ assert excinfo.match(r"Wrong number of segments")
def test_decode_bad_token_not_base64():
with pytest.raises((ValueError, TypeError)) as excinfo:
- jwt.decode('1.2.3', PUBLIC_CERT_BYTES)
- assert excinfo.match(r'Incorrect padding|more than a multiple of 4')
+ jwt.decode("1.2.3", PUBLIC_CERT_BYTES)
+ assert excinfo.match(r"Incorrect padding|more than a multiple of 4")
def test_decode_bad_token_not_json():
- token = b'.'.join([base64.urlsafe_b64encode(b'123!')] * 3)
+ token = b".".join([base64.urlsafe_b64encode(b"123!")] * 3)
with pytest.raises(ValueError) as excinfo:
jwt.decode(token, PUBLIC_CERT_BYTES)
- assert excinfo.match(r'Can\'t parse segment')
+ assert excinfo.match(r"Can\'t parse segment")
def test_decode_bad_token_no_iat_or_exp(signer):
- token = jwt.encode(signer, {'test': 'value'})
+ token = jwt.encode(signer, {"test": "value"})
with pytest.raises(ValueError) as excinfo:
jwt.decode(token, PUBLIC_CERT_BYTES)
- assert excinfo.match(r'Token does not contain required claim')
+ assert excinfo.match(r"Token does not contain required claim")
def test_decode_bad_token_too_early(token_factory):
- token = token_factory(claims={
- 'iat': _helpers.datetime_to_secs(
- _helpers.utcnow() + datetime.timedelta(hours=1))
- })
+ token = token_factory(
+ claims={
+ "iat": _helpers.datetime_to_secs(
+ _helpers.utcnow() + datetime.timedelta(hours=1)
+ )
+ }
+ )
with pytest.raises(ValueError) as excinfo:
jwt.decode(token, PUBLIC_CERT_BYTES)
- assert excinfo.match(r'Token used too early')
+ assert excinfo.match(r"Token used too early")
def test_decode_bad_token_expired(token_factory):
- token = token_factory(claims={
- 'exp': _helpers.datetime_to_secs(
- _helpers.utcnow() - datetime.timedelta(hours=1))
- })
+ token = token_factory(
+ claims={
+ "exp": _helpers.datetime_to_secs(
+ _helpers.utcnow() - datetime.timedelta(hours=1)
+ )
+ }
+ )
with pytest.raises(ValueError) as excinfo:
jwt.decode(token, PUBLIC_CERT_BYTES)
- assert excinfo.match(r'Token expired')
+ assert excinfo.match(r"Token expired")
def test_decode_bad_token_wrong_audience(token_factory):
token = token_factory()
- audience = '[email protected]'
+ audience = "[email protected]"
with pytest.raises(ValueError) as excinfo:
jwt.decode(token, PUBLIC_CERT_BYTES, audience=audience)
- assert excinfo.match(r'Token has wrong audience')
+ assert excinfo.match(r"Token has wrong audience")
def test_decode_wrong_cert(token_factory):
with pytest.raises(ValueError) as excinfo:
jwt.decode(token_factory(), OTHER_CERT_BYTES)
- assert excinfo.match(r'Could not verify token signature')
+ assert excinfo.match(r"Could not verify token signature")
def test_decode_multicert_bad_cert(token_factory):
- certs = {'1': OTHER_CERT_BYTES, '2': PUBLIC_CERT_BYTES}
+ certs = {"1": OTHER_CERT_BYTES, "2": PUBLIC_CERT_BYTES}
with pytest.raises(ValueError) as excinfo:
jwt.decode(token_factory(), certs)
- assert excinfo.match(r'Could not verify token signature')
+ assert excinfo.match(r"Could not verify token signature")
def test_decode_no_cert(token_factory):
- certs = {'2': PUBLIC_CERT_BYTES}
+ certs = {"2": PUBLIC_CERT_BYTES}
with pytest.raises(ValueError) as excinfo:
jwt.decode(token_factory(), certs)
- assert excinfo.match(r'Certificate for key id 1 not found')
+ assert excinfo.match(r"Certificate for key id 1 not found")
def test_decode_no_key_id(token_factory):
token = token_factory(key_id=False)
- certs = {'2': PUBLIC_CERT_BYTES}
+ certs = {"2": PUBLIC_CERT_BYTES}
payload = jwt.decode(token, certs)
- assert payload['user'] == 'billy bob'
+ assert payload["user"] == "billy bob"
def test_roundtrip_explicit_key_id(token_factory):
- token = token_factory(key_id='3')
- certs = {'2': OTHER_CERT_BYTES, '3': PUBLIC_CERT_BYTES}
+ token = token_factory(key_id="3")
+ certs = {"2": OTHER_CERT_BYTES, "3": PUBLIC_CERT_BYTES}
payload = jwt.decode(token, certs)
- assert payload['user'] == 'billy bob'
+ assert payload["user"] == "billy bob"
class TestCredentials(object):
- SERVICE_ACCOUNT_EMAIL = '[email protected]'
- SUBJECT = 'subject'
- AUDIENCE = 'audience'
- ADDITIONAL_CLAIMS = {'meta': 'data'}
+ SERVICE_ACCOUNT_EMAIL = "[email protected]"
+ SUBJECT = "subject"
+ AUDIENCE = "audience"
+ ADDITIONAL_CLAIMS = {"meta": "data"}
credentials = None
@pytest.fixture(autouse=True)
def credentials_fixture(self, signer):
self.credentials = jwt.Credentials(
- signer, self.SERVICE_ACCOUNT_EMAIL, self.SERVICE_ACCOUNT_EMAIL,
- self.AUDIENCE)
+ signer,
+ self.SERVICE_ACCOUNT_EMAIL,
+ self.SERVICE_ACCOUNT_EMAIL,
+ self.AUDIENCE,
+ )
def test_from_service_account_info(self):
- with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
+ with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh:
info = json.load(fh)
credentials = jwt.Credentials.from_service_account_info(
- info, audience=self.AUDIENCE)
+ info, audience=self.AUDIENCE
+ )
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._issuer == info['client_email']
- assert credentials._subject == info['client_email']
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._issuer == info["client_email"]
+ assert credentials._subject == info["client_email"]
assert credentials._audience == self.AUDIENCE
def test_from_service_account_info_args(self):
info = SERVICE_ACCOUNT_INFO.copy()
credentials = jwt.Credentials.from_service_account_info(
- info, subject=self.SUBJECT, audience=self.AUDIENCE,
- additional_claims=self.ADDITIONAL_CLAIMS)
+ info,
+ subject=self.SUBJECT,
+ audience=self.AUDIENCE,
+ additional_claims=self.ADDITIONAL_CLAIMS,
+ )
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._issuer == info['client_email']
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._issuer == info["client_email"]
assert credentials._subject == self.SUBJECT
assert credentials._audience == self.AUDIENCE
assert credentials._additional_claims == self.ADDITIONAL_CLAIMS
@@ -239,33 +257,37 @@
info = SERVICE_ACCOUNT_INFO.copy()
credentials = jwt.Credentials.from_service_account_file(
- SERVICE_ACCOUNT_JSON_FILE, audience=self.AUDIENCE)
+ SERVICE_ACCOUNT_JSON_FILE, audience=self.AUDIENCE
+ )
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._issuer == info['client_email']
- assert credentials._subject == info['client_email']
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._issuer == info["client_email"]
+ assert credentials._subject == info["client_email"]
assert credentials._audience == self.AUDIENCE
def test_from_service_account_file_args(self):
info = SERVICE_ACCOUNT_INFO.copy()
credentials = jwt.Credentials.from_service_account_file(
- SERVICE_ACCOUNT_JSON_FILE, subject=self.SUBJECT,
- audience=self.AUDIENCE, additional_claims=self.ADDITIONAL_CLAIMS)
+ SERVICE_ACCOUNT_JSON_FILE,
+ subject=self.SUBJECT,
+ audience=self.AUDIENCE,
+ additional_claims=self.ADDITIONAL_CLAIMS,
+ )
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._issuer == info['client_email']
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._issuer == info["client_email"]
assert credentials._subject == self.SUBJECT
assert credentials._audience == self.AUDIENCE
assert credentials._additional_claims == self.ADDITIONAL_CLAIMS
def test_from_signing_credentials(self):
jwt_from_signing = self.credentials.from_signing_credentials(
- self.credentials,
- audience=mock.sentinel.new_audience)
+ self.credentials, audience=mock.sentinel.new_audience
+ )
jwt_from_info = jwt.Credentials.from_service_account_info(
- SERVICE_ACCOUNT_INFO,
- audience=mock.sentinel.new_audience)
+ SERVICE_ACCOUNT_INFO, audience=mock.sentinel.new_audience
+ )
assert isinstance(jwt_from_signing, jwt.Credentials)
assert jwt_from_signing._signer.key_id == jwt_from_info._signer.key_id
@@ -279,19 +301,17 @@
assert not self.credentials.expired
def test_with_claims(self):
- new_audience = 'new_audience'
- new_credentials = self.credentials.with_claims(
- audience=new_audience)
+ new_audience = "new_audience"
+ new_credentials = self.credentials.with_claims(audience=new_audience)
assert new_credentials._signer == self.credentials._signer
assert new_credentials._issuer == self.credentials._issuer
assert new_credentials._subject == self.credentials._subject
assert new_credentials._audience == new_audience
- assert (new_credentials._additional_claims ==
- self.credentials._additional_claims)
+ assert new_credentials._additional_claims == self.credentials._additional_claims
def test_sign_bytes(self):
- to_sign = b'123'
+ to_sign = b"123"
signature = self.credentials.sign_bytes(to_sign)
assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
@@ -299,12 +319,11 @@
assert isinstance(self.credentials.signer, crypt.RSASigner)
def test_signer_email(self):
- assert (self.credentials.signer_email ==
- SERVICE_ACCOUNT_INFO['client_email'])
+ assert self.credentials.signer_email == SERVICE_ACCOUNT_INFO["client_email"]
def _verify_token(self, token):
payload = jwt.decode(token, PUBLIC_CERT_BYTES)
- assert payload['iss'] == self.SERVICE_ACCOUNT_EMAIL
+ assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
return payload
def test_refresh(self):
@@ -318,7 +337,7 @@
self.credentials.refresh(None)
assert not self.credentials.expired
- with mock.patch('google.auth._helpers.utcnow') as now:
+ with mock.patch("google.auth._helpers.utcnow") as now:
one_day = datetime.timedelta(days=1)
now.return_value = self.credentials.expiry + one_day
assert self.credentials.expired
@@ -328,55 +347,58 @@
self.credentials.refresh(None)
self.credentials.before_request(
- None, 'GET', 'http://example.com?a=1#3', headers)
+ None, "GET", "http://example.com?a=1#3", headers
+ )
- header_value = headers['authorization']
- _, token = header_value.split(' ')
+ header_value = headers["authorization"]
+ _, token = header_value.split(" ")
# Since the audience is set, it should use the existing token.
- assert token.encode('utf-8') == self.credentials.token
+ assert token.encode("utf-8") == self.credentials.token
payload = self._verify_token(token)
- assert payload['aud'] == self.AUDIENCE
+ assert payload["aud"] == self.AUDIENCE
def test_before_request_refreshes(self):
assert not self.credentials.valid
- self.credentials.before_request(
- None, 'GET', 'http://example.com?a=1#3', {})
+ self.credentials.before_request(None, "GET", "http://example.com?a=1#3", {})
assert self.credentials.valid
class TestOnDemandCredentials(object):
- SERVICE_ACCOUNT_EMAIL = '[email protected]'
- SUBJECT = 'subject'
- ADDITIONAL_CLAIMS = {'meta': 'data'}
+ SERVICE_ACCOUNT_EMAIL = "[email protected]"
+ SUBJECT = "subject"
+ ADDITIONAL_CLAIMS = {"meta": "data"}
credentials = None
@pytest.fixture(autouse=True)
def credentials_fixture(self, signer):
self.credentials = jwt.OnDemandCredentials(
- signer, self.SERVICE_ACCOUNT_EMAIL, self.SERVICE_ACCOUNT_EMAIL,
- max_cache_size=2)
+ signer,
+ self.SERVICE_ACCOUNT_EMAIL,
+ self.SERVICE_ACCOUNT_EMAIL,
+ max_cache_size=2,
+ )
def test_from_service_account_info(self):
- with open(SERVICE_ACCOUNT_JSON_FILE, 'r') as fh:
+ with open(SERVICE_ACCOUNT_JSON_FILE, "r") as fh:
info = json.load(fh)
credentials = jwt.OnDemandCredentials.from_service_account_info(info)
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._issuer == info['client_email']
- assert credentials._subject == info['client_email']
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._issuer == info["client_email"]
+ assert credentials._subject == info["client_email"]
def test_from_service_account_info_args(self):
info = SERVICE_ACCOUNT_INFO.copy()
credentials = jwt.OnDemandCredentials.from_service_account_info(
- info, subject=self.SUBJECT,
- additional_claims=self.ADDITIONAL_CLAIMS)
+ info, subject=self.SUBJECT, additional_claims=self.ADDITIONAL_CLAIMS
+ )
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._issuer == info['client_email']
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._issuer == info["client_email"]
assert credentials._subject == self.SUBJECT
assert credentials._additional_claims == self.ADDITIONAL_CLAIMS
@@ -384,29 +406,32 @@
info = SERVICE_ACCOUNT_INFO.copy()
credentials = jwt.OnDemandCredentials.from_service_account_file(
- SERVICE_ACCOUNT_JSON_FILE)
+ SERVICE_ACCOUNT_JSON_FILE
+ )
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._issuer == info['client_email']
- assert credentials._subject == info['client_email']
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._issuer == info["client_email"]
+ assert credentials._subject == info["client_email"]
def test_from_service_account_file_args(self):
info = SERVICE_ACCOUNT_INFO.copy()
credentials = jwt.OnDemandCredentials.from_service_account_file(
- SERVICE_ACCOUNT_JSON_FILE, subject=self.SUBJECT,
- additional_claims=self.ADDITIONAL_CLAIMS)
+ SERVICE_ACCOUNT_JSON_FILE,
+ subject=self.SUBJECT,
+ additional_claims=self.ADDITIONAL_CLAIMS,
+ )
- assert credentials._signer.key_id == info['private_key_id']
- assert credentials._issuer == info['client_email']
+ assert credentials._signer.key_id == info["private_key_id"]
+ assert credentials._issuer == info["client_email"]
assert credentials._subject == self.SUBJECT
assert credentials._additional_claims == self.ADDITIONAL_CLAIMS
def test_from_signing_credentials(self):
- jwt_from_signing = self.credentials.from_signing_credentials(
- self.credentials)
+ jwt_from_signing = self.credentials.from_signing_credentials(self.credentials)
jwt_from_info = jwt.OnDemandCredentials.from_service_account_info(
- SERVICE_ACCOUNT_INFO)
+ SERVICE_ACCOUNT_INFO
+ )
assert isinstance(jwt_from_signing, jwt.OnDemandCredentials)
assert jwt_from_signing._signer.key_id == jwt_from_info._signer.key_id
@@ -420,9 +445,8 @@
assert not self.credentials.expired
def test_with_claims(self):
- new_claims = {'meep': 'moop'}
- new_credentials = self.credentials.with_claims(
- additional_claims=new_claims)
+ new_claims = {"meep": "moop"}
+ new_credentials = self.credentials.with_claims(additional_claims=new_claims)
assert new_credentials._signer == self.credentials._signer
assert new_credentials._issuer == self.credentials._issuer
@@ -430,7 +454,7 @@
assert new_credentials._additional_claims == new_claims
def test_sign_bytes(self):
- to_sign = b'123'
+ to_sign = b"123"
signature = self.credentials.sign_bytes(to_sign)
assert crypt.verify_signature(to_sign, signature, PUBLIC_CERT_BYTES)
@@ -438,12 +462,11 @@
assert isinstance(self.credentials.signer, crypt.RSASigner)
def test_signer_email(self):
- assert (self.credentials.signer_email ==
- SERVICE_ACCOUNT_INFO['client_email'])
+ assert self.credentials.signer_email == SERVICE_ACCOUNT_INFO["client_email"]
def _verify_token(self, token):
payload = jwt.decode(token, PUBLIC_CERT_BYTES)
- assert payload['iss'] == self.SERVICE_ACCOUNT_EMAIL
+ assert payload["iss"] == self.SERVICE_ACCOUNT_EMAIL
return payload
def test_refresh(self):
@@ -454,25 +477,27 @@
headers = {}
self.credentials.before_request(
- None, 'GET', 'http://example.com?a=1#3', headers)
+ None, "GET", "http://example.com?a=1#3", headers
+ )
- _, token = headers['authorization'].split(' ')
+ _, token = headers["authorization"].split(" ")
payload = self._verify_token(token)
- assert payload['aud'] == 'http://example.com'
+ assert payload["aud"] == "http://example.com"
# Making another request should re-use the same token.
- self.credentials.before_request(
- None, 'GET', 'http://example.com?b=2', headers)
+ self.credentials.before_request(None, "GET", "http://example.com?b=2", headers)
- _, new_token = headers['authorization'].split(' ')
+ _, new_token = headers["authorization"].split(" ")
assert new_token == token
def test_expired_token(self):
- self.credentials._cache['audience'] = (
- mock.sentinel.token, datetime.datetime.min)
+ self.credentials._cache["audience"] = (
+ mock.sentinel.token,
+ datetime.datetime.min,
+ )
- token = self.credentials._get_jwt_for_audience('audience')
+ token = self.credentials._get_jwt_for_audience("audience")
assert token != mock.sentinel.token
diff --git a/tests/transport/compliance.py b/tests/transport/compliance.py
index 50c6d7c..dc7c58b 100644
--- a/tests/transport/compliance.py
+++ b/tests/transport/compliance.py
@@ -22,12 +22,11 @@
from google.auth import exceptions
# .invalid will never resolve, see https://tools.ietf.org/html/rfc2606
-NXDOMAIN = 'test.invalid'
+NXDOMAIN = "test.invalid"
class RequestResponseTests(object):
-
- @pytest.fixture(scope='module')
+ @pytest.fixture(scope="module")
def server(self):
"""Provides a test HTTP server.
@@ -40,20 +39,21 @@
# pylint: disable=unused-variable
# (pylint thinks the flask routes are unusued.)
- @app.route('/basic')
+ @app.route("/basic")
def index():
- header_value = flask.request.headers.get('x-test-header', 'value')
- headers = {'X-Test-Header': header_value}
- return 'Basic Content', http_client.OK, headers
+ header_value = flask.request.headers.get("x-test-header", "value")
+ headers = {"X-Test-Header": header_value}
+ return "Basic Content", http_client.OK, headers
- @app.route('/server_error')
+ @app.route("/server_error")
def server_error():
- return 'Error', http_client.INTERNAL_SERVER_ERROR
+ return "Error", http_client.INTERNAL_SERVER_ERROR
- @app.route('/wait')
+ @app.route("/wait")
def wait():
time.sleep(3)
- return 'Waited'
+ return "Waited"
+
# pylint: enable=unused-variable
server = WSGIServer(application=app.wsgi_app)
@@ -63,44 +63,46 @@
def test_request_basic(self, server):
request = self.make_request()
- response = request(url=server.url + '/basic', method='GET')
+ response = request(url=server.url + "/basic", method="GET")
assert response.status == http_client.OK
- assert response.headers['x-test-header'] == 'value'
- assert response.data == b'Basic Content'
+ assert response.headers["x-test-header"] == "value"
+ assert response.data == b"Basic Content"
def test_request_with_timeout_success(self, server):
request = self.make_request()
- response = request(url=server.url + '/basic', method='GET', timeout=2)
+ response = request(url=server.url + "/basic", method="GET", timeout=2)
assert response.status == http_client.OK
- assert response.headers['x-test-header'] == 'value'
- assert response.data == b'Basic Content'
+ assert response.headers["x-test-header"] == "value"
+ assert response.data == b"Basic Content"
def test_request_with_timeout_failure(self, server):
request = self.make_request()
with pytest.raises(exceptions.TransportError):
- request(url=server.url + '/wait', method='GET', timeout=1)
+ request(url=server.url + "/wait", method="GET", timeout=1)
def test_request_headers(self, server):
request = self.make_request()
response = request(
- url=server.url + '/basic', method='GET', headers={
- 'x-test-header': 'hello world'})
+ url=server.url + "/basic",
+ method="GET",
+ headers={"x-test-header": "hello world"},
+ )
assert response.status == http_client.OK
- assert response.headers['x-test-header'] == 'hello world'
- assert response.data == b'Basic Content'
+ assert response.headers["x-test-header"] == "hello world"
+ assert response.data == b"Basic Content"
def test_request_error(self, server):
request = self.make_request()
- response = request(url=server.url + '/server_error', method='GET')
+ response = request(url=server.url + "/server_error", method="GET")
assert response.status == http_client.INTERNAL_SERVER_ERROR
- assert response.data == b'Error'
+ assert response.data == b"Error"
def test_connection_error(self):
request = self.make_request()
with pytest.raises(exceptions.TransportError):
- request(url='http://{}'.format(NXDOMAIN), method='GET')
+ request(url="http://{}".format(NXDOMAIN), method="GET")
diff --git a/tests/transport/test__http_client.py b/tests/transport/test__http_client.py
index 6b69088..9e7f108 100644
--- a/tests/transport/test__http_client.py
+++ b/tests/transport/test__http_client.py
@@ -26,6 +26,6 @@
def test_non_http(self):
request = self.make_request()
with pytest.raises(exceptions.TransportError) as excinfo:
- request(url='https://{}'.format(compliance.NXDOMAIN), method='GET')
+ request(url="https://{}".format(compliance.NXDOMAIN), method="GET")
- assert excinfo.match('https')
+ assert excinfo.match("https")
diff --git a/tests/transport/test_grpc.py b/tests/transport/test_grpc.py
index c98dcff..810d038 100644
--- a/tests/transport/test_grpc.py
+++ b/tests/transport/test_grpc.py
@@ -25,22 +25,23 @@
# pylint: disable=ungrouped-imports
import grpc
import google.auth.transport.grpc
+
HAS_GRPC = True
except ImportError: # pragma: NO COVER
HAS_GRPC = False
-pytestmark = pytest.mark.skipif(not HAS_GRPC, reason='gRPC is unavailable.')
+pytestmark = pytest.mark.skipif(not HAS_GRPC, reason="gRPC is unavailable.")
class CredentialsStub(credentials.Credentials):
- def __init__(self, token='token'):
+ def __init__(self, token="token"):
super(CredentialsStub, self).__init__()
self.token = token
self.expiry = None
def refresh(self, request):
- self.token += '1'
+ self.token += "1"
class TestAuthMetadataPlugin(object):
@@ -48,8 +49,7 @@
credentials = CredentialsStub()
request = mock.create_autospec(transport.Request)
- plugin = google.auth.transport.grpc.AuthMetadataPlugin(
- credentials, request)
+ plugin = google.auth.transport.grpc.AuthMetadataPlugin(credentials, request)
context = mock.create_autospec(grpc.AuthMetadataContext, instance=True)
context.method_name = mock.sentinel.method_name
@@ -59,15 +59,15 @@
plugin(context, callback)
callback.assert_called_once_with(
- [(u'authorization', u'Bearer {}'.format(credentials.token))], None)
+ [(u"authorization", u"Bearer {}".format(credentials.token))], None
+ )
def test_call_refresh(self):
credentials = CredentialsStub()
credentials.expiry = datetime.datetime.min + _helpers.CLOCK_SKEW
request = mock.create_autospec(transport.Request)
- plugin = google.auth.transport.grpc.AuthMetadataPlugin(
- credentials, request)
+ plugin = google.auth.transport.grpc.AuthMetadataPlugin(credentials, request)
context = mock.create_autospec(grpc.AuthMetadataContext, instance=True)
context.method_name = mock.sentinel.method_name
@@ -76,29 +76,33 @@
plugin(context, callback)
- assert credentials.token == 'token1'
+ assert credentials.token == "token1"
callback.assert_called_once_with(
- [(u'authorization', u'Bearer {}'.format(credentials.token))], None)
+ [(u"authorization", u"Bearer {}".format(credentials.token))], None
+ )
[email protected]('grpc.composite_channel_credentials', autospec=True)
[email protected]('grpc.metadata_call_credentials', autospec=True)
[email protected]('grpc.ssl_channel_credentials', autospec=True)
[email protected]('grpc.secure_channel', autospec=True)
[email protected]("grpc.composite_channel_credentials", autospec=True)
[email protected]("grpc.metadata_call_credentials", autospec=True)
[email protected]("grpc.ssl_channel_credentials", autospec=True)
[email protected]("grpc.secure_channel", autospec=True)
def test_secure_authorized_channel(
- secure_channel, ssl_channel_credentials, metadata_call_credentials,
- composite_channel_credentials):
+ secure_channel,
+ ssl_channel_credentials,
+ metadata_call_credentials,
+ composite_channel_credentials,
+):
credentials = CredentialsStub()
request = mock.create_autospec(transport.Request)
- target = 'example.com:80'
+ target = "example.com:80"
channel = google.auth.transport.grpc.secure_authorized_channel(
- credentials, request, target, options=mock.sentinel.options)
+ credentials, request, target, options=mock.sentinel.options
+ )
# Check the auth plugin construction.
auth_plugin = metadata_call_credentials.call_args[0][0]
- assert isinstance(
- auth_plugin, google.auth.transport.grpc.AuthMetadataPlugin)
+ assert isinstance(auth_plugin, google.auth.transport.grpc.AuthMetadataPlugin)
assert auth_plugin._credentials == credentials
assert auth_plugin._request == request
@@ -107,35 +111,41 @@
# Check the composite credentials call.
composite_channel_credentials.assert_called_once_with(
- ssl_channel_credentials.return_value,
- metadata_call_credentials.return_value)
+ ssl_channel_credentials.return_value, metadata_call_credentials.return_value
+ )
# Check the channel call.
secure_channel.assert_called_once_with(
- target, composite_channel_credentials.return_value,
- options=mock.sentinel.options)
+ target,
+ composite_channel_credentials.return_value,
+ options=mock.sentinel.options,
+ )
assert channel == secure_channel.return_value
[email protected]('grpc.composite_channel_credentials', autospec=True)
[email protected]('grpc.metadata_call_credentials', autospec=True)
[email protected]('grpc.ssl_channel_credentials', autospec=True)
[email protected]('grpc.secure_channel', autospec=True)
[email protected]("grpc.composite_channel_credentials", autospec=True)
[email protected]("grpc.metadata_call_credentials", autospec=True)
[email protected]("grpc.ssl_channel_credentials", autospec=True)
[email protected]("grpc.secure_channel", autospec=True)
def test_secure_authorized_channel_explicit_ssl(
- secure_channel, ssl_channel_credentials, metadata_call_credentials,
- composite_channel_credentials):
+ secure_channel,
+ ssl_channel_credentials,
+ metadata_call_credentials,
+ composite_channel_credentials,
+):
credentials = mock.Mock()
request = mock.Mock()
- target = 'example.com:80'
+ target = "example.com:80"
ssl_credentials = mock.Mock()
google.auth.transport.grpc.secure_authorized_channel(
- credentials, request, target, ssl_credentials=ssl_credentials)
+ credentials, request, target, ssl_credentials=ssl_credentials
+ )
# Check the ssl channel call.
assert not ssl_channel_credentials.called
# Check the composite credentials call.
composite_channel_credentials.assert_called_once_with(
- ssl_credentials,
- metadata_call_credentials.return_value)
+ ssl_credentials, metadata_call_credentials.return_value
+ )
diff --git a/tests/transport/test_requests.py b/tests/transport/test_requests.py
index 311992a..0e165ac 100644
--- a/tests/transport/test_requests.py
+++ b/tests/transport/test_requests.py
@@ -29,24 +29,24 @@
def test_timeout(self):
http = mock.create_autospec(requests.Session, instance=True)
request = google.auth.transport.requests.Request(http)
- request(url='http://example.com', method='GET', timeout=5)
+ request(url="http://example.com", method="GET", timeout=5)
- assert http.request.call_args[1]['timeout'] == 5
+ assert http.request.call_args[1]["timeout"] == 5
class CredentialsStub(google.auth.credentials.Credentials):
- def __init__(self, token='token'):
+ def __init__(self, token="token"):
super(CredentialsStub, self).__init__()
self.token = token
def apply(self, headers, token=None):
- headers['authorization'] = self.token
+ headers["authorization"] = self.token
def before_request(self, request, method, url, headers):
self.apply(headers)
def refresh(self, request):
- self.token += '1'
+ self.token += "1"
class AdapterStub(requests.adapters.BaseAdapter):
@@ -77,11 +77,12 @@
class TestAuthorizedHttp(object):
- TEST_URL = 'http://example.com/'
+ TEST_URL = "http://example.com/"
def test_constructor(self):
authed_session = google.auth.transport.requests.AuthorizedSession(
- mock.sentinel.credentials)
+ mock.sentinel.credentials
+ )
assert authed_session.credentials == mock.sentinel.credentials
@@ -90,7 +91,8 @@
auth_request = google.auth.transport.requests.Request(http)
authed_session = google.auth.transport.requests.AuthorizedSession(
- mock.sentinel.credentials, auth_request=auth_request)
+ mock.sentinel.credentials, auth_request=auth_request
+ )
assert authed_session._auth_request == auth_request
@@ -99,32 +101,30 @@
response = make_response()
adapter = AdapterStub([response])
- authed_session = google.auth.transport.requests.AuthorizedSession(
- credentials)
+ authed_session = google.auth.transport.requests.AuthorizedSession(credentials)
authed_session.mount(self.TEST_URL, adapter)
- result = authed_session.request('GET', self.TEST_URL)
+ result = authed_session.request("GET", self.TEST_URL)
assert response == result
assert credentials.before_request.called
assert not credentials.refresh.called
assert len(adapter.requests) == 1
assert adapter.requests[0].url == self.TEST_URL
- assert adapter.requests[0].headers['authorization'] == 'token'
+ assert adapter.requests[0].headers["authorization"] == "token"
def test_request_refresh(self):
credentials = mock.Mock(wraps=CredentialsStub())
final_response = make_response(status=http_client.OK)
# First request will 401, second request will succeed.
- adapter = AdapterStub([
- make_response(status=http_client.UNAUTHORIZED),
- final_response])
+ adapter = AdapterStub(
+ [make_response(status=http_client.UNAUTHORIZED), final_response]
+ )
- authed_session = google.auth.transport.requests.AuthorizedSession(
- credentials)
+ authed_session = google.auth.transport.requests.AuthorizedSession(credentials)
authed_session.mount(self.TEST_URL, adapter)
- result = authed_session.request('GET', self.TEST_URL)
+ result = authed_session.request("GET", self.TEST_URL)
assert result == final_response
assert credentials.before_request.call_count == 2
@@ -132,7 +132,7 @@
assert len(adapter.requests) == 2
assert adapter.requests[0].url == self.TEST_URL
- assert adapter.requests[0].headers['authorization'] == 'token'
+ assert adapter.requests[0].headers["authorization"] == "token"
assert adapter.requests[1].url == self.TEST_URL
- assert adapter.requests[1].headers['authorization'] == 'token1'
+ assert adapter.requests[1].headers["authorization"] == "token1"
diff --git a/tests/transport/test_urllib3.py b/tests/transport/test_urllib3.py
index a119b74..1d7ce5a 100644
--- a/tests/transport/test_urllib3.py
+++ b/tests/transport/test_urllib3.py
@@ -29,35 +29,35 @@
def test_timeout(self):
http = mock.create_autospec(urllib3.PoolManager)
request = google.auth.transport.urllib3.Request(http)
- request(url='http://example.com', method='GET', timeout=5)
+ request(url="http://example.com", method="GET", timeout=5)
- assert http.request.call_args[1]['timeout'] == 5
+ assert http.request.call_args[1]["timeout"] == 5
def test__make_default_http_with_certifi():
http = google.auth.transport.urllib3._make_default_http()
- assert 'cert_reqs' in http.connection_pool_kw
+ assert "cert_reqs" in http.connection_pool_kw
[email protected](google.auth.transport.urllib3, 'certifi', new=None)
[email protected](google.auth.transport.urllib3, "certifi", new=None)
def test__make_default_http_without_certifi():
http = google.auth.transport.urllib3._make_default_http()
- assert 'cert_reqs' not in http.connection_pool_kw
+ assert "cert_reqs" not in http.connection_pool_kw
class CredentialsStub(google.auth.credentials.Credentials):
- def __init__(self, token='token'):
+ def __init__(self, token="token"):
super(CredentialsStub, self).__init__()
self.token = token
def apply(self, headers, token=None):
- headers['authorization'] = self.token
+ headers["authorization"] = self.token
def before_request(self, request, method, url, headers):
self.apply(headers)
def refresh(self, request):
- self.token += '1'
+ self.token += "1"
class HttpStub(object):
@@ -78,11 +78,12 @@
class TestAuthorizedHttp(object):
- TEST_URL = 'http://example.com'
+ TEST_URL = "http://example.com"
def test_authed_http_defaults(self):
authed_http = google.auth.transport.urllib3.AuthorizedHttp(
- mock.sentinel.credentials)
+ mock.sentinel.credentials
+ )
assert authed_http.credentials == mock.sentinel.credentials
assert isinstance(authed_http.http, urllib3.PoolManager)
@@ -93,40 +94,41 @@
http = HttpStub([response])
authed_http = google.auth.transport.urllib3.AuthorizedHttp(
- credentials, http=http)
+ credentials, http=http
+ )
- result = authed_http.urlopen('GET', self.TEST_URL)
+ result = authed_http.urlopen("GET", self.TEST_URL)
assert result == response
assert credentials.before_request.called
assert not credentials.refresh.called
assert http.requests == [
- ('GET', self.TEST_URL, None, {'authorization': 'token'}, {})]
+ ("GET", self.TEST_URL, None, {"authorization": "token"}, {})
+ ]
def test_urlopen_refresh(self):
credentials = mock.Mock(wraps=CredentialsStub())
final_response = ResponseStub(status=http_client.OK)
# First request will 401, second request will succeed.
- http = HttpStub([
- ResponseStub(status=http_client.UNAUTHORIZED),
- final_response])
+ http = HttpStub([ResponseStub(status=http_client.UNAUTHORIZED), final_response])
authed_http = google.auth.transport.urllib3.AuthorizedHttp(
- credentials, http=http)
+ credentials, http=http
+ )
- authed_http = authed_http.urlopen('GET', 'http://example.com')
+ authed_http = authed_http.urlopen("GET", "http://example.com")
assert authed_http == final_response
assert credentials.before_request.call_count == 2
assert credentials.refresh.called
assert http.requests == [
- ('GET', self.TEST_URL, None, {'authorization': 'token'}, {}),
- ('GET', self.TEST_URL, None, {'authorization': 'token1'}, {})]
+ ("GET", self.TEST_URL, None, {"authorization": "token"}, {}),
+ ("GET", self.TEST_URL, None, {"authorization": "token1"}, {}),
+ ]
def test_proxies(self):
http = mock.create_autospec(urllib3.PoolManager)
- authed_http = google.auth.transport.urllib3.AuthorizedHttp(
- None, http=http)
+ authed_http = google.auth.transport.urllib3.AuthorizedHttp(None, http=http)
with authed_http:
pass