blob: ec9d13b63f294dd8948fc3149155a584d5966a2c [file] [log] [blame]
# Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import mock
import pytest
from google.auth import _helpers
from google.auth import exceptions
from google.auth import jwt
from google.auth import transport
from google.auth.compute_engine import credentials
class TestCredentials(object):
credentials = None
@pytest.fixture(autouse=True)
def credentials_fixture(self):
self.credentials = credentials.Credentials()
def test_default_state(self):
assert not self.credentials.valid
# Expiration hasn't been set yet
assert not self.credentials.expired
# Scopes aren't needed
assert not self.credentials.requires_scopes
# Service account email hasn't been populated
assert self.credentials.service_account_email == "default"
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_success(self, get, utcnow):
get.side_effect = [
{
# First request is for sevice account info.
"email": "[email protected]",
"scopes": ["one", "two"],
},
{
# Second request is for the token.
"access_token": "token",
"expires_in": 500,
},
]
# Refresh credentials
self.credentials.refresh(None)
# Check that the credentials have the token and proper expiration
assert self.credentials.token == "token"
assert self.credentials.expiry == (utcnow() + datetime.timedelta(seconds=500))
# Check the credential info
assert self.credentials.service_account_email == "[email protected]"
assert self.credentials._scopes == ["one", "two"]
# Check that the credentials are valid (have a token and are not
# expired)
assert self.credentials.valid
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_error(self, get):
get.side_effect = exceptions.TransportError("http error")
with pytest.raises(exceptions.RefreshError) as excinfo:
self.credentials.refresh(None)
assert excinfo.match(r"http error")
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_before_request_refreshes(self, get):
get.side_effect = [
{
# First request is for sevice account info.
"email": "[email protected]",
"scopes": "one two",
},
{
# Second request is for the token.
"access_token": "token",
"expires_in": 500,
},
]
# Credentials should start as invalid
assert not self.credentials.valid
# before_request should cause a refresh
request = mock.create_autospec(transport.Request, instance=True)
self.credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
# The refresh endpoint should've been called.
assert get.called
# Credentials should now be valid.
assert self.credentials.valid
class TestIDTokenCredentials(object):
credentials = None
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_default_state(self, get):
get.side_effect = [
{"email": "[email protected]", "scope": ["one", "two"]}
]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://example.com"
)
assert not self.credentials.valid
# Expiration hasn't been set yet
assert not self.credentials.expired
# Service account email hasn't been populated
assert self.credentials.service_account_email == "[email protected]"
# Signer is initialized
assert self.credentials.signer
assert self.credentials.signer_email == "[email protected]"
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_make_authorization_grant_assertion(self, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://audience.com"
)
# Generate authorization grant:
token = self.credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, verify=False)
# The JWT token signature is 'signature' encoded in base 64:
assert token.endswith(b".c2lnbmF0dXJl")
# Check that the credentials have the token and proper expiration
assert payload == {
"aud": "https://www.googleapis.com/oauth2/v4/token",
"exp": 3600,
"iat": 0,
"iss": "[email protected]",
"target_audience": "https://audience.com",
}
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_with_service_account(self, sign, get, utcnow):
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request,
target_audience="https://audience.com",
service_account_email="[email protected]",
)
# Generate authorization grant:
token = self.credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, verify=False)
# The JWT token signature is 'signature' encoded in base 64:
assert token.endswith(b".c2lnbmF0dXJl")
# Check that the credentials have the token and proper expiration
assert payload == {
"aud": "https://www.googleapis.com/oauth2/v4/token",
"exp": 3600,
"iat": 0,
"iss": "[email protected]",
"target_audience": "https://audience.com",
}
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_additional_claims(self, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request,
target_audience="https://audience.com",
additional_claims={"foo": "bar"},
)
# Generate authorization grant:
token = self.credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, verify=False)
# The JWT token signature is 'signature' encoded in base 64:
assert token.endswith(b".c2lnbmF0dXJl")
# Check that the credentials have the token and proper expiration
assert payload == {
"aud": "https://www.googleapis.com/oauth2/v4/token",
"exp": 3600,
"iat": 0,
"iss": "[email protected]",
"target_audience": "https://audience.com",
"foo": "bar",
}
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_with_target_audience(self, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://audience.com"
)
self.credentials = self.credentials.with_target_audience("https://actually.not")
# Generate authorization grant:
token = self.credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, verify=False)
# The JWT token signature is 'signature' encoded in base 64:
assert token.endswith(b".c2lnbmF0dXJl")
# Check that the credentials have the token and proper expiration
assert payload == {
"aud": "https://www.googleapis.com/oauth2/v4/token",
"exp": 3600,
"iat": 0,
"iss": "[email protected]",
"target_audience": "https://actually.not",
}
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
@mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
def test_refresh_success(self, id_token_jwt_grant, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
id_token_jwt_grant.side_effect = [
("idtoken", datetime.datetime.utcfromtimestamp(3600), {})
]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://audience.com"
)
# Refresh credentials
self.credentials.refresh(None)
# Check that the credentials have the token and proper expiration
assert self.credentials.token == "idtoken"
assert self.credentials.expiry == (datetime.datetime.utcfromtimestamp(3600))
# Check the credential info
assert self.credentials.service_account_email == "[email protected]"
# Check that the credentials are valid (have a token and are not
# expired)
assert self.credentials.valid
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_refresh_error(self, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
response = mock.Mock()
response.data = b'{"error": "http error"}'
response.status = 500
request.side_effect = [response]
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://audience.com"
)
with pytest.raises(exceptions.RefreshError) as excinfo:
self.credentials.refresh(request)
assert excinfo.match(r"http error")
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
@mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
def test_before_request_refreshes(self, id_token_jwt_grant, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": "one two"}
]
sign.side_effect = [b"signature"]
id_token_jwt_grant.side_effect = [
("idtoken", datetime.datetime.utcfromtimestamp(3600), {})
]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://audience.com"
)
# Credentials should start as invalid
assert not self.credentials.valid
# before_request should cause a refresh
request = mock.create_autospec(transport.Request, instance=True)
self.credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
# The refresh endpoint should've been called.
assert get.called
# Credentials should now be valid.
assert self.credentials.valid
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_sign_bytes(self, sign, get):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
response = mock.Mock()
response.data = b'{"signature": "c2lnbmF0dXJl"}'
response.status = 200
request.side_effect = [response]
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://audience.com"
)
# Generate authorization grant:
signature = self.credentials.sign_bytes(b"some bytes")
# The JWT token signature is 'signature' encoded in base 64:
assert signature == b"signature"