blob: ab7d45e0486ce8d63d2069ae318a0f0f1941d887 [file] [log] [blame]
# Copyright 2016 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import datetime
import os
from unittest import mock
import pytest # type: ignore
import responses # type: ignore
from google.auth import _helpers
from google.auth import environment_vars
from google.auth import exceptions
from google.auth import jwt
from google.auth import transport
from google.auth.compute_engine import credentials
from google.auth.transport import requests
from google.oauth2 import _client
SAMPLE_ID_TOKEN_EXP = 1584393400
# header: {"alg": "RS256", "typ": "JWT", "kid": "1"}
# payload: {"iss": "issuer", "iat": 1584393348, "sub": "subject",
# "exp": 1584393400,"aud": "audience"}
SAMPLE_ID_TOKEN = (
b"eyJhbGciOiAiUlMyNTYiLCAidHlwIjogIkpXVCIsICJraWQiOiAiMSJ9."
b"eyJpc3MiOiAiaXNzdWVyIiwgImlhdCI6IDE1ODQzOTMzNDgsICJzdWIiO"
b"iAic3ViamVjdCIsICJleHAiOiAxNTg0MzkzNDAwLCAiYXVkIjogImF1ZG"
b"llbmNlIn0."
b"OquNjHKhTmlgCk361omRo18F_uY-7y0f_AmLbzW062Q1Zr61HAwHYP5FM"
b"316CK4_0cH8MUNGASsvZc3VqXAqub6PUTfhemH8pFEwBdAdG0LhrNkU0H"
b"WN1YpT55IiQ31esLdL5q-qDsOPpNZJUti1y1lAreM5nIn2srdWzGXGs4i"
b"TRQsn0XkNUCL4RErpciXmjfhMrPkcAjKA-mXQm2fa4jmTlEZFqFmUlym1"
b"ozJ0yf5grjN6AslN4OGvAv1pS-_Ko_pGBS6IQtSBC6vVKCUuBfaqNjykg"
b"bsxbLa6Fp0SYeYwO8ifEnkRvasVpc1WTQqfRB2JCj5pTBDzJpIpFCMmnQ"
)
ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
"gl-python/3.7 auth/1.1 auth-request-type/at cred-type/mds"
)
ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = (
"gl-python/3.7 auth/1.1 auth-request-type/it cred-type/mds"
)
FAKE_SERVICE_ACCOUNT_EMAIL = "[email protected]"
FAKE_QUOTA_PROJECT_ID = "fake-quota-project"
FAKE_SCOPES = ["scope1", "scope2"]
FAKE_DEFAULT_SCOPES = ["scope3", "scope4"]
FAKE_UNIVERSE_DOMAIN = "fake-universe-domain"
class TestCredentials(object):
credentials = None
credentials_with_all_fields = None
VALID_TRUST_BOUNDARY = {"encodedLocations": "valid-encoded-locations"}
NO_OP_TRUST_BOUNDARY = {"encodedLocations": ""}
EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/default/allowedLocations"
@pytest.fixture(autouse=True)
def credentials_fixture(self):
self.credentials = credentials.Credentials()
self.credentials_with_all_fields = credentials.Credentials(
service_account_email=FAKE_SERVICE_ACCOUNT_EMAIL,
quota_project_id=FAKE_QUOTA_PROJECT_ID,
scopes=FAKE_SCOPES,
default_scopes=FAKE_DEFAULT_SCOPES,
universe_domain=FAKE_UNIVERSE_DOMAIN,
)
def test_get_cred_info(self):
assert self.credentials.get_cred_info() == {
"credential_source": "metadata server",
"credential_type": "VM credentials",
"principal": "default",
}
def test_default_state(self):
assert not self.credentials.valid
# Expiration hasn't been set yet
assert not self.credentials.expired
# Scopes are needed
assert self.credentials.requires_scopes
# Service account email hasn't been populated
assert self.credentials.service_account_email == "default"
# No quota project
assert not self.credentials._quota_project_id
# Universe domain is the default and not cached
assert self.credentials._universe_domain == "googleapis.com"
assert not self.credentials._universe_domain_cached
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_success(self, get, utcnow):
get.side_effect = [
{
# First request is for sevice account info.
"email": "[email protected]",
"scopes": ["one", "two"],
},
{
# Second request is for the token.
"access_token": "token",
"expires_in": 500,
},
]
# Refresh credentials
self.credentials.refresh(None)
# Check that the credentials have the token and proper expiration
assert self.credentials.token == "token"
assert self.credentials.expiry == (utcnow() + datetime.timedelta(seconds=500))
# Check the credential info
assert self.credentials.service_account_email == "[email protected]"
assert self.credentials._scopes == ["one", "two"]
# Check that the credentials are valid (have a token and are not
# expired)
assert self.credentials.valid
@mock.patch(
"google.auth.metrics.token_request_access_token_mds",
return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE,
)
@mock.patch(
"google.auth._helpers.utcnow",
return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD,
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_success_with_scopes(self, get, utcnow, mock_metrics_header_value):
get.side_effect = [
{
# First request is for sevice account info.
"email": "[email protected]",
"scopes": ["one", "two"],
},
{
# Second request is for the token.
"access_token": "token",
"expires_in": 500,
},
]
# Refresh credentials
scopes = ["three", "four"]
self.credentials = self.credentials.with_scopes(scopes)
self.credentials.refresh(None)
# Check that the credentials have the token and proper expiration
assert self.credentials.token == "token"
assert self.credentials.expiry == (utcnow() + datetime.timedelta(seconds=500))
# Check the credential info
assert self.credentials.service_account_email == "[email protected]"
assert self.credentials._scopes == scopes
# Check that the credentials are valid (have a token and are not
# expired)
assert self.credentials.valid
kwargs = get.call_args[1]
assert kwargs["params"] == {"scopes": "three,four"}
assert kwargs["headers"] == {
"x-goog-api-client": ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE
}
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_no_email(self, get):
get.return_value = {
# No "email" field.
"scopes": ["one", "two"]
}
with pytest.raises(exceptions.RefreshError) as excinfo:
self.credentials.refresh(None)
assert excinfo.match(r"missing 'email' field")
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_error(self, get):
get.side_effect = exceptions.TransportError("http error")
with pytest.raises(exceptions.RefreshError) as excinfo:
self.credentials.refresh(None)
assert excinfo.match(r"http error")
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_before_request_refreshes(self, get):
get.side_effect = [
{
# First request is for sevice account info.
"email": "[email protected]",
"scopes": "one two",
},
{
# Second request is for the token.
"access_token": "token",
"expires_in": 500,
},
]
# Credentials should start as invalid
assert not self.credentials.valid
# before_request should cause a refresh
request = mock.create_autospec(transport.Request, instance=True)
self.credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
# The refresh endpoint should've been called.
assert get.called
# Credentials should now be valid.
assert self.credentials.valid
def test_with_quota_project(self):
creds = self.credentials_with_all_fields.with_quota_project("project-foo")
assert creds._quota_project_id == "project-foo"
assert creds._service_account_email == FAKE_SERVICE_ACCOUNT_EMAIL
assert creds._scopes == FAKE_SCOPES
assert creds._default_scopes == FAKE_DEFAULT_SCOPES
assert creds.universe_domain == FAKE_UNIVERSE_DOMAIN
assert creds._universe_domain_cached
def test_with_scopes(self):
scopes = ["one", "two"]
creds = self.credentials_with_all_fields.with_scopes(scopes)
assert creds._scopes == scopes
assert creds._quota_project_id == FAKE_QUOTA_PROJECT_ID
assert creds._service_account_email == FAKE_SERVICE_ACCOUNT_EMAIL
assert creds._default_scopes is None
assert creds.universe_domain == FAKE_UNIVERSE_DOMAIN
assert creds._universe_domain_cached
def test_with_universe_domain(self):
creds = self.credentials_with_all_fields.with_universe_domain("universe_domain")
assert creds._scopes == FAKE_SCOPES
assert creds._quota_project_id == FAKE_QUOTA_PROJECT_ID
assert creds._service_account_email == FAKE_SERVICE_ACCOUNT_EMAIL
assert creds._default_scopes == FAKE_DEFAULT_SCOPES
assert creds.universe_domain == "universe_domain"
assert creds._universe_domain_cached
def test_with_trust_boundary(self):
creds = self.credentials_with_all_fields
new_boundary = {"encodedLocations": "new_boundary"}
new_creds = creds.with_trust_boundary(new_boundary)
assert new_creds is not creds
assert new_creds._trust_boundary == new_boundary
assert new_creds._service_account_email == creds._service_account_email
assert new_creds._quota_project_id == creds._quota_project_id
assert new_creds._scopes == creds._scopes
assert new_creds._default_scopes == creds._default_scopes
def test_token_usage_metrics(self):
self.credentials.token = "token"
self.credentials.expiry = None
headers = {}
self.credentials.before_request(mock.Mock(), None, None, headers)
assert headers["authorization"] == "Bearer token"
assert headers["x-goog-api-client"] == "cred-type/mds"
@mock.patch(
"google.auth.compute_engine._metadata.get_universe_domain",
return_value="fake_universe_domain",
)
def test_universe_domain(self, get_universe_domain):
# Check the default state
assert not self.credentials._universe_domain_cached
assert self.credentials._universe_domain == "googleapis.com"
# calling the universe_domain property should trigger a call to
# get_universe_domain to fetch the value. The value should be cached.
assert self.credentials.universe_domain == "fake_universe_domain"
assert self.credentials._universe_domain == "fake_universe_domain"
assert self.credentials._universe_domain_cached
get_universe_domain.assert_called_once()
# calling the universe_domain property the second time should use the
# cached value instead of calling get_universe_domain
assert self.credentials.universe_domain == "fake_universe_domain"
get_universe_domain.assert_called_once()
@mock.patch("google.auth.compute_engine._metadata.get_universe_domain")
def test_user_provided_universe_domain(self, get_universe_domain):
assert self.credentials_with_all_fields.universe_domain == FAKE_UNIVERSE_DOMAIN
assert self.credentials_with_all_fields._universe_domain_cached
# Since user provided universe_domain, we will not call the universe
# domain endpoint.
get_universe_domain.assert_not_called()
@mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_trust_boundary_lookup_skipped_if_env_var_not_true(
self, mock_metadata_get, mock_lookup_tb
):
creds = self.credentials
request = mock.Mock()
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "default", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token", "expires_in": 3600},
]
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "false"}
):
creds.refresh(request)
mock_lookup_tb.assert_not_called()
assert creds._trust_boundary is None
@mock.patch("google.oauth2._client._lookup_trust_boundary", autospec=True)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_trust_boundary_lookup_skipped_if_env_var_missing(
self, mock_metadata_get, mock_lookup_tb
):
creds = self.credentials
request = mock.Mock()
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "default", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token", "expires_in": 3600},
]
with mock.patch.dict(os.environ, clear=True):
creds.refresh(request)
mock_lookup_tb.assert_not_called()
assert creds._trust_boundary is None
@mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_refresh_trust_boundary_lookup_success(
self, mock_metadata_get, mock_lookup_tb
):
mock_lookup_tb.return_value = {
"locations": ["us-central1"],
"encodedLocations": "0xABC",
}
creds = self.credentials
request = mock.Mock()
# The first call to _metadata.get is for service account info, the second
# for the access token, and the third for the universe domain.
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "[email protected]", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token", "expires_in": 3600},
# from get_universe_domain
"",
]
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
creds.refresh(request)
# Verify _metadata.get was called three times.
assert mock_metadata_get.call_count == 3
# Verify lookup_trust_boundary was called with correct URL and token
expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
mock_lookup_tb.assert_called_once_with(
request, expected_url, headers={"authorization": "Bearer mock_token"}
)
# Verify trust boundary was set
assert creds._trust_boundary == {
"locations": ["us-central1"],
"encodedLocations": "0xABC",
}
# Verify x-allowed-locations header is set by apply()
headers_applied = {}
creds.apply(headers_applied)
assert headers_applied["x-allowed-locations"] == "0xABC"
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
def test_refresh_trust_boundary_lookup_fails_no_cache(
self, mock_lookup_tb, mock_metadata_get
):
mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed")
creds = self.credentials
request = mock.Mock()
# Mock metadata calls for token, universe domain, and service account info
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "[email protected]", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token", "expires_in": 3600},
# from get_universe_domain
"",
]
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
with pytest.raises(exceptions.RefreshError, match="Lookup failed"):
creds.refresh(request)
assert creds._trust_boundary is None
assert mock_metadata_get.call_count == 3
mock_lookup_tb.assert_called_once()
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
def test_refresh_trust_boundary_lookup_fails_with_cached_data(
self, mock_lookup_tb, mock_metadata_get
):
# First refresh: Successfully fetch a valid trust boundary.
mock_lookup_tb.return_value = {
"locations": ["us-central1"],
"encodedLocations": "0xABC",
}
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "[email protected]", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token_1", "expires_in": 3600},
# from get_universe_domain
"",
]
creds = self.credentials
request = mock.Mock()
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
creds.refresh(request)
assert creds._trust_boundary == {
"locations": ["us-central1"],
"encodedLocations": "0xABC",
}
mock_lookup_tb.assert_called_once()
# Second refresh: Mock lookup to fail, but expect cached data to be preserved.
mock_lookup_tb.reset_mock()
mock_lookup_tb.side_effect = exceptions.RefreshError("Lookup failed")
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
# This refresh should not raise an error because a cached value exists.
mock_metadata_get.reset_mock()
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "[email protected]", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token_2", "expires_in": 3600},
# from get_universe_domain
"",
]
creds.refresh(request)
assert creds._trust_boundary == {
"locations": ["us-central1"],
"encodedLocations": "0xABC",
}
mock_lookup_tb.assert_called_once()
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
def test_refresh_fetches_no_op_trust_boundary(
self, mock_lookup_tb, mock_metadata_get
):
mock_lookup_tb.return_value = {"locations": [], "encodedLocations": "0x0"}
creds = self.credentials
request = mock.Mock()
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "[email protected]", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token", "expires_in": 3600},
# from get_universe_domain
"",
]
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
creds.refresh(request)
assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"}
assert mock_metadata_get.call_count == 3
expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
mock_lookup_tb.assert_called_once_with(
request, expected_url, headers={"authorization": "Bearer mock_token"}
)
# Verify that an empty header was added.
headers_applied = {}
creds.apply(headers_applied)
assert headers_applied["x-allowed-locations"] == ""
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch.object(_client, "_lookup_trust_boundary", autospec=True)
def test_refresh_starts_with_no_op_trust_boundary_skips_lookup(
self, mock_lookup_tb, mock_metadata_get
):
creds = self.credentials
# Use pre-cache universe domain to avoid an extra metadata call.
creds._universe_domain_cached = True
creds._trust_boundary = {"locations": [], "encodedLocations": "0x0"}
request = mock.Mock()
mock_metadata_get.side_effect = [
# from _retrieve_info
{"email": "[email protected]", "scopes": ["scope1"]},
# from get_service_account_token
{"access_token": "mock_token", "expires_in": 3600},
]
with mock.patch.dict(
os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"}
):
creds.refresh(request)
# Verify trust boundary remained NO_OP
assert creds._trust_boundary == {"locations": [], "encodedLocations": "0x0"}
# Lookup should be skipped
mock_lookup_tb.assert_not_called()
# Two metadata calls for token refresh should have happened.
assert mock_metadata_get.call_count == 2
# Verify that an empty header was added.
headers_applied = {}
creds.apply(headers_applied)
assert headers_applied["x-allowed-locations"] == ""
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
@mock.patch(
"google.auth.compute_engine._metadata.get_universe_domain", autospec=True
)
def test_build_trust_boundary_lookup_url_default_email(
self, mock_get_universe_domain, mock_get_service_account_info
):
# Test with default service account email, which needs resolution
creds = self.credentials
creds._service_account_email = "default"
mock_get_service_account_info.return_value = {
"email": "[email protected]"
}
mock_get_universe_domain.return_value = "googleapis.com"
url = creds._build_trust_boundary_lookup_url()
mock_get_service_account_info.assert_called_once_with(mock.ANY, "default")
mock_get_universe_domain.assert_called_once_with(mock.ANY)
assert url == (
"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
)
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
@mock.patch(
"google.auth.compute_engine._metadata.get_universe_domain", autospec=True
)
def test_build_trust_boundary_lookup_url_explicit_email(
self, mock_get_universe_domain, mock_get_service_account_info
):
# Test with an explicit service account email, no resolution needed
creds = self.credentials
creds._service_account_email = FAKE_SERVICE_ACCOUNT_EMAIL
mock_get_universe_domain.return_value = "googleapis.com"
url = creds._build_trust_boundary_lookup_url()
mock_get_service_account_info.assert_not_called()
mock_get_universe_domain.assert_called_once_with(mock.ANY)
assert url == (
"https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
)
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
@mock.patch(
"google.auth.compute_engine._metadata.get_universe_domain", autospec=True
)
def test_build_trust_boundary_lookup_url_non_default_universe(
self, mock_get_universe_domain, mock_get_service_account_info
):
# Test with a non-default universe domain
creds = self.credentials_with_all_fields
url = creds._build_trust_boundary_lookup_url()
# Universe domain is cached and email is explicit, so no metadata calls needed.
mock_get_service_account_info.assert_not_called()
mock_get_universe_domain.assert_not_called()
assert url == (
"https://iamcredentials.fake-universe-domain/v1/projects/-/serviceAccounts/[email protected]/allowedLocations"
)
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
def test_build_trust_boundary_lookup_url_get_service_account_info_fails(
self, mock_get_service_account_info
):
# Test scenario where get_service_account_info fails
mock_get_service_account_info.side_effect = exceptions.TransportError(
"Failed to get info"
)
creds = self.credentials
creds._service_account_email = "default"
with pytest.raises(
exceptions.RefreshError,
match=r"Failed to get service account email for trust boundary lookup: .*",
):
creds._build_trust_boundary_lookup_url()
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
def test_build_trust_boundary_lookup_url_no_email(
self, mock_get_service_account_info
):
# Test with default service account email, which needs resolution, but metadata
# returns no email.
creds = self.credentials
creds._service_account_email = "default"
mock_get_service_account_info.return_value = {"scopes": ["one", "two"]}
with pytest.raises(exceptions.RefreshError) as excinfo:
creds._build_trust_boundary_lookup_url()
assert excinfo.match(r"missing 'email' field")
@mock.patch("google.auth.compute_engine._metadata.get")
@mock.patch("google.auth._agent_identity_utils.get_agent_identity_certificate_path")
@mock.patch("google.auth._agent_identity_utils.parse_certificate")
@mock.patch(
"google.auth._agent_identity_utils.should_request_bound_token",
return_value=True,
)
@mock.patch(
"google.auth._agent_identity_utils.calculate_certificate_fingerprint",
return_value="fingerprint",
)
def test_refresh_with_agent_identity(
self,
mock_calculate_fingerprint,
mock_should_request,
mock_parse_certificate,
mock_get_path,
mock_metadata_get,
tmpdir,
):
cert_path = tmpdir.join("cert.pem")
cert_path.write(b"cert_content")
mock_get_path.return_value = str(cert_path)
mock_metadata_get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]},
{"access_token": "token", "expires_in": 500},
]
self.credentials.refresh(None)
assert self.credentials.token == "token"
mock_parse_certificate.assert_called_once_with(b"cert_content")
mock_should_request.assert_called_once_with(mock_parse_certificate.return_value)
kwargs = mock_metadata_get.call_args[1]
assert kwargs["params"] == {
"scopes": "one,two",
"bindCertificateFingerprint": "fingerprint",
}
@mock.patch("google.auth.compute_engine._metadata.get")
@mock.patch("google.auth._agent_identity_utils.get_agent_identity_certificate_path")
@mock.patch("google.auth._agent_identity_utils.parse_certificate")
@mock.patch(
"google.auth._agent_identity_utils.should_request_bound_token",
return_value=False,
)
def test_refresh_with_agent_identity_opt_out_or_not_agent(
self,
mock_should_request,
mock_parse_certificate,
mock_get_path,
mock_metadata_get,
tmpdir,
):
cert_path = tmpdir.join("cert.pem")
cert_path.write(b"cert_content")
mock_get_path.return_value = str(cert_path)
mock_metadata_get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]},
{"access_token": "token", "expires_in": 500},
]
self.credentials.refresh(None)
assert self.credentials.token == "token"
mock_parse_certificate.assert_called_once_with(b"cert_content")
mock_should_request.assert_called_once_with(mock_parse_certificate.return_value)
kwargs = mock_metadata_get.call_args[1]
assert "bindCertificateFingerprint" not in kwargs.get("params", {})
class TestIDTokenCredentials(object):
credentials = None
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_default_state(self, get):
get.side_effect = [
{"email": "[email protected]", "scope": ["one", "two"]}
]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://example.com"
)
assert not self.credentials.valid
# Expiration hasn't been set yet
assert not self.credentials.expired
# Service account email hasn't been populated
assert self.credentials.service_account_email == "[email protected]"
# Signer is initialized
assert self.credentials.signer
assert self.credentials.signer_email == "[email protected]"
# No quota project
assert not self.credentials._quota_project_id
@mock.patch(
"google.auth._helpers.utcnow",
return_value=_helpers.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_make_authorization_grant_assertion(self, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://audience.com"
)
# Generate authorization grant:
token = self.credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, verify=False)
# The JWT token signature is 'signature' encoded in base 64:
assert token.endswith(b".c2lnbmF0dXJl")
# Check that the credentials have the token and proper expiration
assert payload == {
"aud": "https://www.googleapis.com/oauth2/v4/token",
"exp": 3600,
"iat": 0,
"iss": "[email protected]",
"target_audience": "https://audience.com",
}
@mock.patch(
"google.auth._helpers.utcnow",
return_value=_helpers.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_with_service_account(self, sign, get, utcnow):
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request,
target_audience="https://audience.com",
service_account_email="[email protected]",
)
# Generate authorization grant:
token = self.credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, verify=False)
# The JWT token signature is 'signature' encoded in base 64:
assert token.endswith(b".c2lnbmF0dXJl")
# Check that the credentials have the token and proper expiration
assert payload == {
"aud": "https://www.googleapis.com/oauth2/v4/token",
"exp": 3600,
"iat": 0,
"iss": "[email protected]",
"target_audience": "https://audience.com",
}
@mock.patch(
"google.auth._helpers.utcnow",
return_value=_helpers.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_additional_claims(self, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request,
target_audience="https://audience.com",
additional_claims={"foo": "bar"},
)
# Generate authorization grant:
token = self.credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, verify=False)
# The JWT token signature is 'signature' encoded in base 64:
assert token.endswith(b".c2lnbmF0dXJl")
# Check that the credentials have the token and proper expiration
assert payload == {
"aud": "https://www.googleapis.com/oauth2/v4/token",
"exp": 3600,
"iat": 0,
"iss": "[email protected]",
"target_audience": "https://audience.com",
"foo": "bar",
}
def test_token_uri(self):
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request,
signer=mock.Mock(),
service_account_email="[email protected]",
target_audience="https://audience.com",
)
assert self.credentials._token_uri == credentials._DEFAULT_TOKEN_URI
self.credentials = credentials.IDTokenCredentials(
request=request,
signer=mock.Mock(),
service_account_email="[email protected]",
target_audience="https://audience.com",
token_uri="https://example.com/token",
)
assert self.credentials._token_uri == "https://example.com/token"
@mock.patch(
"google.auth._helpers.utcnow",
return_value=_helpers.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_with_target_audience(self, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://audience.com"
)
self.credentials = self.credentials.with_target_audience("https://actually.not")
# Generate authorization grant:
token = self.credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, verify=False)
# The JWT token signature is 'signature' encoded in base 64:
assert token.endswith(b".c2lnbmF0dXJl")
# Check that the credentials have the token and proper expiration
assert payload == {
"aud": "https://www.googleapis.com/oauth2/v4/token",
"exp": 3600,
"iat": 0,
"iss": "[email protected]",
"target_audience": "https://actually.not",
}
# Check that the signer have been initialized with a Request object
assert isinstance(self.credentials._signer._request, transport.Request)
@responses.activate
def test_with_target_audience_integration(self):
"""Test that it is possible to refresh credentials
generated from `with_target_audience`.
Instead of mocking the methods, the HTTP responses
have been mocked.
"""
# mock information about credentials
responses.add(
responses.GET,
"http://metadata.google.internal/computeMetadata/v1/instance/"
"service-accounts/default/?recursive=true",
status=200,
content_type="application/json",
json={
"scopes": "email",
"email": "[email protected]",
"aliases": ["default"],
},
)
# mock information about universe_domain
responses.add(
responses.GET,
"http://metadata.google.internal/computeMetadata/v1/universe/"
"universe-domain",
status=200,
content_type="application/json",
json={},
)
# mock token for credentials
responses.add(
responses.GET,
"http://metadata.google.internal/computeMetadata/v1/instance/"
"service-accounts/default/token",
status=200,
content_type="application/json",
json={
"access_token": "some-token",
"expires_in": 3210,
"token_type": "Bearer",
},
)
# mock sign blob endpoint
signature = base64.b64encode(b"some-signature").decode("utf-8")
responses.add(
responses.POST,
"https://iamcredentials.googleapis.com/v1/projects/-/"
"serviceAccounts/[email protected]:signBlob",
status=200,
content_type="application/json",
json={"keyId": "some-key-id", "signedBlob": signature},
)
id_token = "{}.{}.{}".format(
base64.b64encode(b'{"some":"some"}').decode("utf-8"),
base64.b64encode(b'{"exp": 3210}').decode("utf-8"),
base64.b64encode(b"token").decode("utf-8"),
)
# mock id token endpoint
responses.add(
responses.POST,
"https://www.googleapis.com/oauth2/v4/token",
status=200,
content_type="application/json",
json={"id_token": id_token, "expiry": 3210},
)
self.credentials = credentials.IDTokenCredentials(
request=requests.Request(),
service_account_email="[email protected]",
target_audience="https://audience.com",
)
self.credentials = self.credentials.with_target_audience("https://actually.not")
self.credentials.refresh(requests.Request())
assert self.credentials.token is not None
@mock.patch(
"google.auth._helpers.utcnow",
return_value=_helpers.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_with_quota_project(self, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://audience.com"
)
self.credentials = self.credentials.with_quota_project("project-foo")
assert self.credentials._quota_project_id == "project-foo"
# Generate authorization grant:
token = self.credentials._make_authorization_grant_assertion()
payload = jwt.decode(token, verify=False)
# The JWT token signature is 'signature' encoded in base 64:
assert token.endswith(b".c2lnbmF0dXJl")
# Check that the credentials have the token and proper expiration
assert payload == {
"aud": "https://www.googleapis.com/oauth2/v4/token",
"exp": 3600,
"iat": 0,
"iss": "[email protected]",
"target_audience": "https://audience.com",
}
# Check that the signer have been initialized with a Request object
assert isinstance(self.credentials._signer._request, transport.Request)
@mock.patch(
"google.auth._helpers.utcnow",
return_value=_helpers.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_with_token_uri(self, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request,
target_audience="https://audience.com",
token_uri="http://xyz.com",
)
assert self.credentials._token_uri == "http://xyz.com"
creds_with_token_uri = self.credentials.with_token_uri("http://example.com")
assert creds_with_token_uri._token_uri == "http://example.com"
@mock.patch(
"google.auth._helpers.utcnow",
return_value=_helpers.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_with_token_uri_exception(self, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request,
target_audience="https://audience.com",
use_metadata_identity_endpoint=True,
)
assert self.credentials._token_uri is None
with pytest.raises(ValueError):
self.credentials.with_token_uri("http://example.com")
@responses.activate
def test_with_quota_project_integration(self):
"""Test that it is possible to refresh credentials
generated from `with_quota_project`.
Instead of mocking the methods, the HTTP responses
have been mocked.
"""
# mock information about credentials
responses.add(
responses.GET,
"http://metadata.google.internal/computeMetadata/v1/instance/"
"service-accounts/default/?recursive=true",
status=200,
content_type="application/json",
json={
"scopes": "email",
"email": "[email protected]",
"aliases": ["default"],
},
)
# mock token for credentials
responses.add(
responses.GET,
"http://metadata.google.internal/computeMetadata/v1/instance/"
"service-accounts/default/token",
status=200,
content_type="application/json",
json={
"access_token": "some-token",
"expires_in": 3210,
"token_type": "Bearer",
},
)
# stubby response about universe_domain
responses.add(
responses.GET,
"http://metadata.google.internal/computeMetadata/v1/universe/"
"universe-domain",
status=200,
content_type="application/json",
json={},
)
# mock sign blob endpoint
signature = base64.b64encode(b"some-signature").decode("utf-8")
responses.add(
responses.POST,
"https://iamcredentials.googleapis.com/v1/projects/-/"
"serviceAccounts/[email protected]:signBlob",
status=200,
content_type="application/json",
json={"keyId": "some-key-id", "signedBlob": signature},
)
id_token = "{}.{}.{}".format(
base64.b64encode(b'{"some":"some"}').decode("utf-8"),
base64.b64encode(b'{"exp": 3210}').decode("utf-8"),
base64.b64encode(b"token").decode("utf-8"),
)
# mock id token endpoint
responses.add(
responses.POST,
"https://www.googleapis.com/oauth2/v4/token",
status=200,
content_type="application/json",
json={"id_token": id_token, "expiry": 3210},
)
self.credentials = credentials.IDTokenCredentials(
request=requests.Request(),
service_account_email="[email protected]",
target_audience="https://audience.com",
)
self.credentials = self.credentials.with_quota_project("project-foo")
self.credentials.refresh(requests.Request())
assert self.credentials.token is not None
assert self.credentials._quota_project_id == "project-foo"
@mock.patch(
"google.auth._helpers.utcnow",
return_value=_helpers.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
@mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
def test_refresh_success(self, id_token_jwt_grant, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
id_token_jwt_grant.side_effect = [
(
"idtoken",
_helpers.utcfromtimestamp(3600),
{},
)
]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://audience.com"
)
# Refresh credentials
self.credentials.refresh(None)
# Check that the credentials have the token and proper expiration
assert self.credentials.token == "idtoken"
assert self.credentials.expiry == _helpers.utcfromtimestamp(3600)
# Check the credential info
assert self.credentials.service_account_email == "[email protected]"
# Check that the credentials are valid (have a token and are not
# expired)
assert self.credentials.valid
@mock.patch(
"google.auth._helpers.utcnow",
return_value=_helpers.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_refresh_error(self, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
response = mock.Mock()
response.data = b'{"error": "http error"}'
response.status = 404 # Throw a 404 so the request is not retried.
request.side_effect = [response]
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://audience.com"
)
with pytest.raises(exceptions.RefreshError) as excinfo:
self.credentials.refresh(request)
assert excinfo.match(r"http error")
@mock.patch(
"google.auth._helpers.utcnow",
return_value=_helpers.utcfromtimestamp(0),
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
@mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
def test_before_request_refreshes(self, id_token_jwt_grant, sign, get, utcnow):
get.side_effect = [
{"email": "[email protected]", "scopes": "one two"}
]
sign.side_effect = [b"signature"]
id_token_jwt_grant.side_effect = [
(
"idtoken",
_helpers.utcfromtimestamp(3600),
{},
)
]
request = mock.create_autospec(transport.Request, instance=True)
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://audience.com"
)
# Credentials should start as invalid
assert not self.credentials.valid
# before_request should cause a refresh
request = mock.create_autospec(transport.Request, instance=True)
self.credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
# The refresh endpoint should've been called.
assert get.called
# Credentials should now be valid.
assert self.credentials.valid
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
@mock.patch("google.auth.iam.Signer.sign", autospec=True)
def test_sign_bytes(self, sign, get):
get.side_effect = [
{"email": "[email protected]", "scopes": ["one", "two"]}
]
sign.side_effect = [b"signature"]
request = mock.create_autospec(transport.Request, instance=True)
response = mock.Mock()
response.data = b'{"signature": "c2lnbmF0dXJl"}'
response.status = 200
request.side_effect = [response]
self.credentials = credentials.IDTokenCredentials(
request=request, target_audience="https://audience.com"
)
# Generate authorization grant:
signature = self.credentials.sign_bytes(b"some bytes")
# The JWT token signature is 'signature' encoded in base 64:
assert signature == b"signature"
@mock.patch(
"google.auth.metrics.token_request_id_token_mds",
return_value=ID_TOKEN_REQUEST_METRICS_HEADER_VALUE,
)
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_get_id_token_from_metadata(
self, get, get_service_account_info, mock_metrics_header_value
):
get.return_value = SAMPLE_ID_TOKEN
get_service_account_info.return_value = {"email": "[email protected]"}
cred = credentials.IDTokenCredentials(
mock.Mock(), "audience", use_metadata_identity_endpoint=True
)
cred.refresh(request=mock.Mock())
assert get.call_args.kwargs["headers"] == {
"x-goog-api-client": ID_TOKEN_REQUEST_METRICS_HEADER_VALUE
}
assert cred.token == SAMPLE_ID_TOKEN
assert cred.expiry == _helpers.utcfromtimestamp(SAMPLE_ID_TOKEN_EXP)
assert cred._use_metadata_identity_endpoint
assert cred._signer is None
assert cred._token_uri is None
assert cred._service_account_email == "[email protected]"
assert cred._target_audience == "audience"
with pytest.raises(ValueError):
cred.sign_bytes(b"bytes")
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
def test_with_target_audience_for_metadata(self, get_service_account_info):
get_service_account_info.return_value = {"email": "[email protected]"}
cred = credentials.IDTokenCredentials(
mock.Mock(), "audience", use_metadata_identity_endpoint=True
)
cred = cred.with_target_audience("new_audience")
assert cred._target_audience == "new_audience"
assert cred._use_metadata_identity_endpoint
assert cred._signer is None
assert cred._token_uri is None
assert cred._service_account_email == "[email protected]"
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
def test_id_token_with_quota_project(self, get_service_account_info):
get_service_account_info.return_value = {"email": "[email protected]"}
cred = credentials.IDTokenCredentials(
mock.Mock(), "audience", use_metadata_identity_endpoint=True
)
cred = cred.with_quota_project("project-foo")
assert cred._quota_project_id == "project-foo"
assert cred._use_metadata_identity_endpoint
assert cred._signer is None
assert cred._token_uri is None
assert cred._service_account_email == "[email protected]"
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_invalid_id_token_from_metadata(self, get, get_service_account_info):
get.return_value = "invalid_id_token"
get_service_account_info.return_value = {"email": "[email protected]"}
cred = credentials.IDTokenCredentials(
mock.Mock(), "audience", use_metadata_identity_endpoint=True
)
with pytest.raises(ValueError):
cred.refresh(request=mock.Mock())
@mock.patch(
"google.auth.compute_engine._metadata.get_service_account_info", autospec=True
)
@mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
def test_transport_error_from_metadata(self, get, get_service_account_info):
get.side_effect = exceptions.TransportError("transport error")
get_service_account_info.return_value = {"email": "[email protected]"}
cred = credentials.IDTokenCredentials(
mock.Mock(), "audience", use_metadata_identity_endpoint=True
)
with pytest.raises(exceptions.RefreshError) as excinfo:
cred.refresh(request=mock.Mock())
assert excinfo.match(r"transport error")
def test_get_id_token_from_metadata_constructor(self):
with pytest.raises(ValueError):
credentials.IDTokenCredentials(
mock.Mock(),
"audience",
use_metadata_identity_endpoint=True,
token_uri="token_uri",
)
with pytest.raises(ValueError):
credentials.IDTokenCredentials(
mock.Mock(),
"audience",
use_metadata_identity_endpoint=True,
signer=mock.Mock(),
)
with pytest.raises(ValueError):
credentials.IDTokenCredentials(
mock.Mock(),
"audience",
use_metadata_identity_endpoint=True,
additional_claims={"key", "value"},
)
with pytest.raises(ValueError):
credentials.IDTokenCredentials(
mock.Mock(),
"audience",
use_metadata_identity_endpoint=True,
service_account_email="[email protected]",
)