| # Copyright 2018 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import copy |
| import datetime |
| import http.client as http_client |
| import json |
| import os |
| from unittest import mock |
| |
| import pytest # type: ignore |
| |
| from google.auth import _helpers |
| from google.auth import credentials as auth_credentials |
| from google.auth import crypt |
| from google.auth import environment_vars |
| from google.auth import exceptions |
| from google.auth import impersonated_credentials |
| from google.auth import transport |
| from google.auth.impersonated_credentials import Credentials |
| from google.oauth2 import credentials |
| from google.oauth2 import service_account |
| |
| DATA_DIR = os.path.join(os.path.dirname(__file__), "", "data") |
| |
| with open(os.path.join(DATA_DIR, "privatekey.pem"), "rb") as fh: |
| PRIVATE_KEY_BYTES = fh.read() |
| |
| SERVICE_ACCOUNT_JSON_FILE = os.path.join(DATA_DIR, "service_account.json") |
| IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE = os.path.join( |
| DATA_DIR, "impersonated_service_account_authorized_user_source.json" |
| ) |
| |
| ID_TOKEN_DATA = ( |
| "eyJhbGciOiJSUzI1NiIsImtpZCI6ImRmMzc1ODkwOGI3OTIyOTNhZDk3N2Ew" |
| "Yjk5MWQ5OGE3N2Y0ZWVlY2QiLCJ0eXAiOiJKV1QifQ.eyJhdWQiOiJodHRwc" |
| "zovL2Zvby5iYXIiLCJhenAiOiIxMDIxMDE1NTA4MzQyMDA3MDg1NjgiLCJle" |
| "HAiOjE1NjQ0NzUwNTEsImlhdCI6MTU2NDQ3MTQ1MSwiaXNzIjoiaHR0cHM6L" |
| "y9hY2NvdW50cy5nb29nbGUuY29tIiwic3ViIjoiMTAyMTAxNTUwODM0MjAwN" |
| "zA4NTY4In0.redacted" |
| ) |
| ID_TOKEN_EXPIRY = 1564475051 |
| |
| with open(SERVICE_ACCOUNT_JSON_FILE, "rb") as fh: |
| SERVICE_ACCOUNT_INFO = json.load(fh) |
| |
| with open(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_FILE, "rb") as fh: |
| IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO = json.load(fh) |
| |
| SIGNER = crypt.RSASigner.from_string(PRIVATE_KEY_BYTES, "1") |
| TOKEN_URI = "https://example.com/oauth2/token" |
| |
| ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = ( |
| "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp" |
| ) |
| ID_TOKEN_REQUEST_METRICS_HEADER_VALUE = ( |
| "gl-python/3.7 auth/1.1 auth-request-type/it cred-type/imp" |
| ) |
| |
| |
| @pytest.fixture |
| def mock_donor_credentials(): |
| with mock.patch("google.oauth2._client.jwt_grant", autospec=True) as grant: |
| grant.return_value = ( |
| "source token", |
| _helpers.utcnow() + datetime.timedelta(seconds=500), |
| {}, |
| ) |
| yield grant |
| |
| |
| @pytest.fixture |
| def mock_dwd_credentials(): |
| with mock.patch("google.oauth2._client.jwt_grant", autospec=True) as grant: |
| grant.return_value = ( |
| "1/fFAGRNJasdfz70BzhT3Zg", |
| _helpers.utcnow() + datetime.timedelta(seconds=500), |
| {}, |
| ) |
| yield grant |
| |
| |
| class MockResponse: |
| def __init__(self, json_data, status_code): |
| self.json_data = json_data |
| self.status_code = status_code |
| |
| def json(self): |
| return self.json_data |
| |
| |
| @pytest.fixture |
| def mock_authorizedsession_sign(): |
| with mock.patch( |
| "google.auth.transport.requests.AuthorizedSession.request", autospec=True |
| ) as auth_session: |
| data = {"keyId": "1", "signedBlob": "c2lnbmF0dXJl"} |
| auth_session.return_value = MockResponse(data, http_client.OK) |
| yield auth_session |
| |
| |
| @pytest.fixture |
| def mock_authorizedsession_idtoken(): |
| with mock.patch( |
| "google.auth.transport.requests.AuthorizedSession.request", autospec=True |
| ) as auth_session: |
| data = {"token": ID_TOKEN_DATA} |
| auth_session.return_value = MockResponse(data, http_client.OK) |
| yield auth_session |
| |
| |
| class TestImpersonatedCredentials(object): |
| SERVICE_ACCOUNT_EMAIL = "[email protected]" |
| TARGET_PRINCIPAL = "[email protected]" |
| TARGET_SCOPES = ["https://www.googleapis.com/auth/devstorage.read_only"] |
| # DELEGATES: List[str] = [] |
| # Because Python 2.7: |
| DELEGATES = [] # type: ignore |
| LIFETIME = 3600 |
| NO_OP_TRUST_BOUNDARY = { |
| "locations": auth_credentials.NO_OP_TRUST_BOUNDARY_LOCATIONS, |
| "encodedLocations": auth_credentials.NO_OP_TRUST_BOUNDARY_ENCODED_LOCATIONS, |
| } |
| VALID_TRUST_BOUNDARY = { |
| "locations": ["us-central1", "us-east1"], |
| "encodedLocations": "0xVALIDHEX", |
| } |
| EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE = ( |
| "https://iamcredentials.googleapis.com/v1/projects/-" |
| "/serviceAccounts/[email protected]/allowedLocations" |
| ) |
| FAKE_UNIVERSE_DOMAIN = "universe.foo" |
| SOURCE_CREDENTIALS = service_account.Credentials( |
| SIGNER, SERVICE_ACCOUNT_EMAIL, TOKEN_URI, trust_boundary=NO_OP_TRUST_BOUNDARY |
| ) |
| USER_SOURCE_CREDENTIALS = credentials.Credentials(token="ABCDE") |
| IAM_ENDPOINT_OVERRIDE = ( |
| "https://us-east1-iamcredentials.googleapis.com/v1/projects/-" |
| + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL) |
| ) |
| |
| def make_credentials( |
| self, |
| source_credentials=SOURCE_CREDENTIALS, |
| lifetime=LIFETIME, |
| target_principal=TARGET_PRINCIPAL, |
| subject=None, |
| iam_endpoint_override=None, |
| trust_boundary=None, # Align with Credentials class default |
| ): |
| return Credentials( |
| source_credentials=source_credentials, |
| target_principal=target_principal, |
| target_scopes=self.TARGET_SCOPES, |
| delegates=self.DELEGATES, |
| lifetime=lifetime, |
| subject=subject, |
| iam_endpoint_override=iam_endpoint_override, |
| trust_boundary=trust_boundary, |
| ) |
| |
| def test_from_impersonated_service_account_info(self): |
| credentials = ( |
| impersonated_credentials.Credentials.from_impersonated_service_account_info( |
| IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO |
| ) |
| ) |
| assert isinstance(credentials, impersonated_credentials.Credentials) |
| |
| def test_from_impersonated_service_account_info_with_trust_boundary(self): |
| info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO) |
| info["trust_boundary"] = self.VALID_TRUST_BOUNDARY |
| credentials = ( |
| impersonated_credentials.Credentials.from_impersonated_service_account_info( |
| info |
| ) |
| ) |
| assert isinstance(credentials, impersonated_credentials.Credentials) |
| assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY |
| |
| def test_from_impersonated_service_account_info_with_invalid_source_credentials_type( |
| self, |
| ): |
| info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO) |
| assert "source_credentials" in info |
| # Set the source_credentials to an invalid type |
| info["source_credentials"]["type"] = "invalid_type" |
| with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: |
| impersonated_credentials.Credentials.from_impersonated_service_account_info( |
| info |
| ) |
| assert excinfo.match( |
| "source credential of type {} is not supported".format("invalid_type") |
| ) |
| |
| def test_from_impersonated_service_account_info_with_invalid_impersonation_url( |
| self, |
| ): |
| info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO) |
| info["service_account_impersonation_url"] = "invalid_url" |
| with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: |
| impersonated_credentials.Credentials.from_impersonated_service_account_info( |
| info |
| ) |
| assert excinfo.match(r"Cannot extract target principal from") |
| |
| def test_from_impersonated_service_account_info_with_scopes(self): |
| info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO) |
| info["scopes"] = ["scope1", "scope2"] |
| credentials = ( |
| impersonated_credentials.Credentials.from_impersonated_service_account_info( |
| info |
| ) |
| ) |
| assert credentials._target_scopes == ["scope1", "scope2"] |
| |
| def test_from_impersonated_service_account_info_with_scopes_param(self): |
| info = copy.deepcopy(IMPERSONATED_SERVICE_ACCOUNT_AUTHORIZED_USER_SOURCE_INFO) |
| info["scopes"] = ["scope_from_info_1", "scope_from_info_2"] |
| scopes_param = ["scope_from_param_1", "scope_from_param_2"] |
| credentials = ( |
| impersonated_credentials.Credentials.from_impersonated_service_account_info( |
| info, scopes=scopes_param |
| ) |
| ) |
| assert credentials._target_scopes == scopes_param |
| |
| def test_get_cred_info(self): |
| credentials = self.make_credentials() |
| assert not credentials.get_cred_info() |
| |
| credentials._cred_file_path = "/path/to/file" |
| assert credentials.get_cred_info() == { |
| "credential_source": "/path/to/file", |
| "credential_type": "impersonated credentials", |
| "principal": "[email protected]", |
| } |
| |
| def test_universe_domain_matching_source(self): |
| source_credentials = service_account.Credentials( |
| SIGNER, "[email protected]", TOKEN_URI, universe_domain="foo.bar" |
| ) |
| credentials = self.make_credentials(source_credentials=source_credentials) |
| assert credentials.universe_domain == "foo.bar" |
| |
| def test__make_copy_get_cred_info(self): |
| credentials = self.make_credentials() |
| credentials._cred_file_path = "/path/to/file" |
| cred_copy = credentials._make_copy() |
| assert cred_copy._cred_file_path == "/path/to/file" |
| |
| def test_make_from_user_credentials(self): |
| credentials = self.make_credentials( |
| source_credentials=self.USER_SOURCE_CREDENTIALS |
| ) |
| assert not credentials.valid |
| assert credentials.expired |
| |
| def test_default_state(self): |
| credentials = self.make_credentials() |
| assert not credentials.valid |
| assert credentials.expired |
| |
| def test_make_from_service_account_self_signed_jwt(self): |
| source_credentials = service_account.Credentials( |
| SIGNER, self.SERVICE_ACCOUNT_EMAIL, TOKEN_URI, always_use_jwt_access=True |
| ) |
| credentials = self.make_credentials(source_credentials=source_credentials) |
| # test the source credential don't lose self signed jwt setting |
| assert credentials._source_credentials._always_use_jwt_access |
| assert credentials._source_credentials._jwt_credentials |
| |
| def make_request( |
| self, |
| data, |
| status=http_client.OK, |
| headers=None, |
| side_effect=None, |
| use_data_bytes=True, |
| ): |
| response = mock.create_autospec(transport.Response, instance=False) |
| response.status = status |
| response.data = _helpers.to_bytes(data) if use_data_bytes else data |
| response.headers = headers or {} |
| |
| request = mock.create_autospec(transport.Request, instance=False) |
| request.side_effect = side_effect |
| request.return_value = response |
| |
| return request |
| |
| def test_token_usage_metrics(self): |
| credentials = self.make_credentials() |
| credentials.token = "token" |
| credentials.expiry = None |
| |
| headers = {} |
| credentials.before_request(mock.Mock(), None, None, headers) |
| assert headers["authorization"] == "Bearer token" |
| assert headers["x-goog-api-client"] == "cred-type/imp" |
| |
| @pytest.mark.parametrize("use_data_bytes", [True, False]) |
| @mock.patch("google.oauth2._client._lookup_trust_boundary") |
| def test_refresh_success( |
| self, mock_lookup_trust_boundary, use_data_bytes, mock_donor_credentials |
| ): |
| # Start with no boundary. |
| credentials = self.make_credentials(lifetime=None, trust_boundary=None) |
| token = "token" |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), |
| status=http_client.OK, |
| use_data_bytes=use_data_bytes, |
| ) |
| |
| # Mock the trust boundary lookup to return a valid value. |
| mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY |
| |
| with mock.patch.dict( |
| os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} |
| ), mock.patch( |
| "google.auth.metrics.token_request_access_token_impersonate", |
| return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| ): |
| credentials.refresh(request) |
| |
| assert credentials.valid |
| assert not credentials.expired |
| assert ( |
| request.call_args.kwargs["headers"]["x-goog-api-client"] |
| == ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE |
| ) |
| |
| # Verify that the x-allowed-locations header from the source credential |
| # was applied. The source credential has a NO_OP boundary, so the |
| # header should be an empty string. |
| request_kwargs = request.call_args[1] |
| assert "headers" in request_kwargs |
| assert "x-allowed-locations" in request_kwargs["headers"] |
| assert request_kwargs["headers"]["x-allowed-locations"] == "" |
| |
| # Verify trust boundary was set. |
| assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY |
| |
| # Verify the mock was called with the correct URL. |
| mock_lookup_trust_boundary.assert_called_once_with( |
| request, |
| self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, |
| headers={"authorization": "Bearer token"}, |
| ) |
| |
| # Verify x-allowed-locations header is set correctly by apply(). |
| headers_applied = {} |
| credentials.apply(headers_applied) |
| assert ( |
| headers_applied["x-allowed-locations"] |
| == self.VALID_TRUST_BOUNDARY["encodedLocations"] |
| ) |
| |
| def test_refresh_source_creds_no_trust_boundary(self): |
| # Use a source credential that does not support trust boundaries. |
| source_credentials = credentials.Credentials(token="source_token") |
| creds = self.make_credentials(source_credentials=source_credentials) |
| token = "impersonated_token" |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| |
| creds.refresh(request) |
| |
| # Verify that the x-allowed-locations header was NOT applied because |
| # the source credential does not support trust boundaries. |
| request_kwargs = request.call_args[1] |
| assert "x-allowed-locations" not in request_kwargs["headers"] |
| |
| @mock.patch("google.oauth2._client._lookup_trust_boundary") |
| def test_refresh_trust_boundary_lookup_fails_no_cache( |
| self, mock_lookup_trust_boundary, mock_donor_credentials |
| ): |
| # Start with no trust boundary |
| credentials = self.make_credentials(lifetime=None, trust_boundary=None) |
| token = "token" |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| |
| # Mock the trust boundary lookup to raise an error |
| mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( |
| "Lookup failed" |
| ) |
| |
| with mock.patch.dict( |
| os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} |
| ), pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.refresh(request) |
| |
| assert "Lookup failed" in str(excinfo.value) |
| assert credentials._trust_boundary is None # Still no trust boundary |
| mock_lookup_trust_boundary.assert_called_once() |
| |
| @mock.patch("google.oauth2._client._lookup_trust_boundary") |
| def test_refresh_fetches_no_op_trust_boundary( |
| self, mock_lookup_trust_boundary, mock_donor_credentials |
| ): |
| # Start with no trust boundary |
| credentials = self.make_credentials(lifetime=None, trust_boundary=None) |
| token = "token" |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| |
| mock_lookup_trust_boundary.return_value = ( |
| self.NO_OP_TRUST_BOUNDARY |
| ) # Mock returns NO_OP |
| |
| with mock.patch.dict( |
| os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} |
| ), mock.patch( |
| "google.auth.metrics.token_request_access_token_impersonate", |
| return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| ): |
| credentials.refresh(request) |
| |
| assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY |
| mock_lookup_trust_boundary.assert_called_once_with( |
| request, |
| self.EXPECTED_TRUST_BOUNDARY_LOOKUP_URL_DEFAULT_UNIVERSE, |
| headers={"authorization": "Bearer token"}, |
| ) |
| headers_applied = {} |
| credentials.apply(headers_applied) |
| assert headers_applied["x-allowed-locations"] == "" |
| |
| @mock.patch("google.oauth2._client._lookup_trust_boundary") |
| def test_refresh_skips_trust_boundary_lookup_non_default_universe( |
| self, mock_lookup_trust_boundary |
| ): |
| # Create source credentials with a non-default universe domain |
| source_credentials = service_account.Credentials( |
| SIGNER, |
| "[email protected]", |
| TOKEN_URI, |
| universe_domain=self.FAKE_UNIVERSE_DOMAIN, |
| ) |
| # Create impersonated credentials using the non-default source credentials |
| credentials = self.make_credentials(source_credentials=source_credentials) |
| |
| # Mock the IAM credentials API call for generateAccessToken |
| token = "token" |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| |
| with mock.patch.dict( |
| os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} |
| ): |
| credentials.refresh(request) |
| |
| # Ensure trust boundary lookup was not called |
| mock_lookup_trust_boundary.assert_not_called() |
| # Verify that x-allowed-locations header is not set by apply() |
| headers_applied = {} |
| credentials.apply(headers_applied) |
| assert "x-allowed-locations" not in headers_applied |
| |
| @mock.patch("google.oauth2._client._lookup_trust_boundary") |
| def test_refresh_starts_with_no_op_trust_boundary_skips_lookup( |
| self, mock_lookup_trust_boundary, mock_donor_credentials |
| ): |
| credentials = self.make_credentials( |
| lifetime=None, trust_boundary=self.NO_OP_TRUST_BOUNDARY |
| ) # Start with NO_OP |
| token = "token" |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| with mock.patch.dict( |
| os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} |
| ), mock.patch( |
| "google.auth.metrics.token_request_access_token_impersonate", |
| return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| ): |
| credentials.refresh(request) |
| |
| # Verify trust boundary remained NO_OP |
| assert credentials._trust_boundary == self.NO_OP_TRUST_BOUNDARY |
| |
| # Lookup should be skipped |
| mock_lookup_trust_boundary.assert_not_called() |
| |
| # Verify that an empty header was added. |
| headers_applied = {} |
| credentials.apply(headers_applied) |
| assert headers_applied["x-allowed-locations"] == "" |
| |
| @mock.patch("google.oauth2._client._lookup_trust_boundary") |
| def test_refresh_trust_boundary_lookup_fails_with_cached_data2( |
| self, mock_lookup_trust_boundary, mock_donor_credentials |
| ): |
| # Start with no trust boundary |
| credentials = self.make_credentials(lifetime=None, trust_boundary=None) |
| token = "token" |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| |
| # First refresh: Successfully fetch a valid trust boundary. |
| mock_lookup_trust_boundary.return_value = self.VALID_TRUST_BOUNDARY |
| with mock.patch.dict( |
| os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} |
| ), mock.patch( |
| "google.auth.metrics.token_request_access_token_impersonate", |
| return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| ): |
| credentials.refresh(request) |
| |
| assert credentials.valid |
| # Verify trust boundary was set. |
| assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY |
| mock_lookup_trust_boundary.assert_called_once() |
| |
| # Second refresh: Mock lookup to fail, but expect cached data to be preserved. |
| mock_lookup_trust_boundary.reset_mock() |
| mock_lookup_trust_boundary.side_effect = exceptions.RefreshError( |
| "Lookup failed" |
| ) |
| |
| with mock.patch.dict( |
| os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} |
| ): |
| credentials.refresh(request) |
| |
| assert credentials.valid |
| assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY |
| mock_lookup_trust_boundary.assert_called_once() |
| |
| @pytest.mark.parametrize("use_data_bytes", [True, False]) |
| def test_refresh_with_subject_success(self, use_data_bytes, mock_dwd_credentials): |
| credentials = self.make_credentials(subject="[email protected]", lifetime=None) |
| |
| response_body = {"signedJwt": "example_signed_jwt"} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), |
| status=http_client.OK, |
| use_data_bytes=use_data_bytes, |
| ) |
| |
| with mock.patch( |
| "google.auth.metrics.token_request_access_token_impersonate", |
| return_value=ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| ): |
| credentials.refresh(request) |
| |
| assert credentials.valid |
| assert not credentials.expired |
| assert credentials.token == "1/fFAGRNJasdfz70BzhT3Zg" |
| |
| @pytest.mark.parametrize("use_data_bytes", [True, False]) |
| def test_refresh_success_nonGdu(self, use_data_bytes, mock_donor_credentials): |
| source_credentials = service_account.Credentials( |
| SIGNER, "[email protected]", TOKEN_URI, universe_domain="foo.bar" |
| ) |
| credentials = self.make_credentials( |
| lifetime=None, source_credentials=source_credentials |
| ) |
| token = "token" |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), |
| status=http_client.OK, |
| use_data_bytes=use_data_bytes, |
| ) |
| |
| credentials.refresh(request) |
| |
| assert credentials.valid |
| assert not credentials.expired |
| # Confirm override endpoint used. |
| request_kwargs = request.call_args[1] |
| assert ( |
| request_kwargs["url"] |
| == "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/[email protected]:generateAccessToken" |
| ) |
| |
| @pytest.mark.parametrize("use_data_bytes", [True, False]) |
| def test_refresh_success_iam_endpoint_override( |
| self, use_data_bytes, mock_donor_credentials |
| ): |
| credentials = self.make_credentials( |
| lifetime=None, iam_endpoint_override=self.IAM_ENDPOINT_OVERRIDE |
| ) |
| token = "token" |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), |
| status=http_client.OK, |
| use_data_bytes=use_data_bytes, |
| ) |
| |
| credentials.refresh(request) |
| |
| assert credentials.valid |
| assert not credentials.expired |
| # Confirm override endpoint used. |
| request_kwargs = request.call_args[1] |
| assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE |
| |
| @pytest.mark.parametrize("time_skew", [150, -150]) |
| def test_refresh_source_credentials(self, time_skew): |
| credentials = self.make_credentials(lifetime=None) |
| |
| # Source credentials is refreshed only if it is expired within |
| # _helpers.REFRESH_THRESHOLD from now. We add a time_skew to the expiry, so |
| # source credentials is refreshed only if time_skew <= 0. |
| credentials._source_credentials.expiry = ( |
| _helpers.utcnow() |
| + _helpers.REFRESH_THRESHOLD |
| + datetime.timedelta(seconds=time_skew) |
| ) |
| credentials._source_credentials.token = "Token" |
| |
| with mock.patch( |
| "google.oauth2.service_account.Credentials._perform_refresh_token", |
| autospec=True, |
| ) as source_cred_refresh_token: |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) |
| + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": "token", "expireTime": expire_time} |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| |
| credentials.refresh(request) |
| |
| if time_skew <= 0: |
| source_cred_refresh_token.assert_called_once() |
| else: |
| source_cred_refresh_token.assert_not_called() |
| |
| def test_refresh_failure_malformed_expire_time(self, mock_donor_credentials): |
| credentials = self.make_credentials(lifetime=None) |
| token = "token" |
| |
| expire_time = (_helpers.utcnow() + datetime.timedelta(seconds=500)).isoformat( |
| "T" |
| ) |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.refresh(request) |
| |
| assert excinfo.match(impersonated_credentials._REFRESH_ERROR) |
| |
| assert not credentials.valid |
| assert credentials.expired |
| |
| def test_refresh_failure_unauthorzed(self, mock_donor_credentials): |
| credentials = self.make_credentials(lifetime=None) |
| |
| response_body = { |
| "error": { |
| "code": 403, |
| "message": "The caller does not have permission", |
| "status": "PERMISSION_DENIED", |
| } |
| } |
| |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.UNAUTHORIZED |
| ) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.refresh(request) |
| |
| assert excinfo.match(impersonated_credentials._REFRESH_ERROR) |
| |
| assert not credentials.valid |
| assert credentials.expired |
| |
| def test_refresh_failure(self): |
| credentials = self.make_credentials(lifetime=None) |
| credentials.expiry = None |
| credentials.token = "token" |
| id_creds = impersonated_credentials.IDTokenCredentials( |
| credentials, target_audience="audience" |
| ) |
| |
| response = mock.create_autospec(transport.Response, instance=False) |
| response.status_code = http_client.UNAUTHORIZED |
| response.json = mock.Mock(return_value="failed to get ID token") |
| |
| with mock.patch( |
| "google.auth.transport.requests.AuthorizedSession.post", |
| return_value=response, |
| ): |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| id_creds.refresh(None) |
| |
| assert excinfo.match("Error getting ID token") |
| |
| def test_refresh_failure_missing_token_in_200_response(self): |
| credentials = self.make_credentials(lifetime=None) |
| credentials.expiry = None |
| credentials.token = "token" |
| id_creds = impersonated_credentials.IDTokenCredentials( |
| credentials, target_audience="audience" |
| ) |
| |
| # Response has 200 OK status but is missing the "token" field |
| response = mock.create_autospec(transport.Response, instance=False) |
| response.status_code = http_client.OK |
| response.json = mock.Mock(return_value={"not_token": "something"}) |
| |
| with mock.patch( |
| "google.auth.transport.requests.AuthorizedSession.post", |
| return_value=response, |
| ): |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| id_creds.refresh(None) |
| |
| assert excinfo.match("No ID token in response") |
| |
| def test_refresh_failure_http_error(self, mock_donor_credentials): |
| credentials = self.make_credentials(lifetime=None) |
| |
| response_body = {} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.HTTPException |
| ) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.refresh(request) |
| |
| assert excinfo.match(impersonated_credentials._REFRESH_ERROR) |
| |
| assert not credentials.valid |
| assert credentials.expired |
| |
| def test_refresh_failure_subject_with_nondefault_domain( |
| self, mock_donor_credentials |
| ): |
| source_credentials = service_account.Credentials( |
| SIGNER, "[email protected]", TOKEN_URI, universe_domain="foo.bar" |
| ) |
| credentials = self.make_credentials( |
| source_credentials=source_credentials, subject="[email protected]" |
| ) |
| |
| expire_time = (_helpers.utcnow().replace(microsecond=0)).isoformat("T") + "Z" |
| response_body = {"accessToken": "token", "expireTime": expire_time} |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| |
| with pytest.raises(exceptions.GoogleAuthError) as excinfo: |
| credentials.refresh(request) |
| |
| assert excinfo.match( |
| "Domain-wide delegation is not supported in universes other " |
| + "than googleapis.com" |
| ) |
| |
| assert not credentials.valid |
| assert credentials.expired |
| |
| def test_expired(self): |
| credentials = self.make_credentials(lifetime=None) |
| assert credentials.expired |
| |
| def test_signer(self): |
| credentials = self.make_credentials() |
| assert isinstance(credentials.signer, impersonated_credentials.Credentials) |
| |
| def test_signer_email(self): |
| credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL) |
| assert credentials.signer_email == self.TARGET_PRINCIPAL |
| |
| def test_service_account_email(self): |
| credentials = self.make_credentials(target_principal=self.TARGET_PRINCIPAL) |
| assert credentials.service_account_email == self.TARGET_PRINCIPAL |
| |
| def test_sign_bytes(self, mock_donor_credentials, mock_authorizedsession_sign): |
| credentials = self.make_credentials(lifetime=None) |
| expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]:signBlob" |
| self._sign_bytes_helper( |
| credentials, |
| mock_donor_credentials, |
| mock_authorizedsession_sign, |
| expected_url, |
| ) |
| |
| def test_sign_bytes_nonGdu( |
| self, mock_donor_credentials, mock_authorizedsession_sign |
| ): |
| source_credentials = service_account.Credentials( |
| SIGNER, "[email protected]", TOKEN_URI, universe_domain="foo.bar" |
| ) |
| credentials = self.make_credentials( |
| lifetime=None, source_credentials=source_credentials |
| ) |
| expected_url = "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/[email protected]:signBlob" |
| self._sign_bytes_helper( |
| credentials, |
| mock_donor_credentials, |
| mock_authorizedsession_sign, |
| expected_url, |
| ) |
| |
| def _sign_bytes_helper( |
| self, |
| credentials, |
| mock_donor_credentials, |
| mock_authorizedsession_sign, |
| expected_url, |
| ): |
| token = "token" |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| token_response_body = {"accessToken": token, "expireTime": expire_time} |
| |
| response = mock.create_autospec(transport.Response, instance=False) |
| response.status = http_client.OK |
| response.data = _helpers.to_bytes(json.dumps(token_response_body)) |
| |
| request = mock.create_autospec(transport.Request, instance=False) |
| request.return_value = response |
| |
| credentials.refresh(request) |
| assert credentials.valid |
| assert not credentials.expired |
| |
| signature = credentials.sign_bytes(b"signed bytes") |
| mock_authorizedsession_sign.assert_called_with( |
| mock.ANY, |
| "POST", |
| expected_url, |
| None, |
| json={"payload": "c2lnbmVkIGJ5dGVz", "delegates": []}, |
| headers={"Content-Type": "application/json"}, |
| ) |
| |
| assert signature == b"signature" |
| |
| def test_sign_bytes_failure(self): |
| credentials = self.make_credentials(lifetime=None) |
| |
| with mock.patch( |
| "google.auth.transport.requests.AuthorizedSession.request", autospec=True |
| ) as auth_session: |
| data = {"error": {"code": 403, "message": "unauthorized"}} |
| mock_response = MockResponse(data, http_client.UNAUTHORIZED) |
| auth_session.return_value = mock_response |
| |
| with pytest.raises(exceptions.TransportError) as excinfo: |
| credentials.sign_bytes(b"foo") |
| assert excinfo.match("'code': 403") |
| |
| @mock.patch("time.sleep", return_value=None) |
| def test_sign_bytes_retryable_failure(self, mock_time): |
| credentials = self.make_credentials(lifetime=None) |
| |
| with mock.patch( |
| "google.auth.transport.requests.AuthorizedSession.request", autospec=True |
| ) as auth_session: |
| data = {"error": {"code": 500, "message": "internal_failure"}} |
| mock_response = MockResponse(data, http_client.INTERNAL_SERVER_ERROR) |
| auth_session.return_value = mock_response |
| |
| with pytest.raises(exceptions.TransportError) as excinfo: |
| credentials.sign_bytes(b"foo") |
| assert excinfo.match("exhausted signBlob endpoint retries") |
| |
| def test_with_quota_project(self): |
| credentials = self.make_credentials() |
| |
| quota_project_creds = credentials.with_quota_project("project-foo") |
| assert quota_project_creds._quota_project_id == "project-foo" |
| |
| @pytest.mark.parametrize("use_data_bytes", [True, False]) |
| def test_with_quota_project_iam_endpoint_override( |
| self, use_data_bytes, mock_donor_credentials |
| ): |
| credentials = self.make_credentials( |
| lifetime=None, iam_endpoint_override=self.IAM_ENDPOINT_OVERRIDE |
| ) |
| token = "token" |
| # iam_endpoint_override should be copied to created credentials. |
| quota_project_creds = credentials.with_quota_project("project-foo") |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), |
| status=http_client.OK, |
| use_data_bytes=use_data_bytes, |
| ) |
| |
| quota_project_creds.refresh(request) |
| |
| assert quota_project_creds.valid |
| assert not quota_project_creds.expired |
| # Confirm override endpoint used. |
| request_kwargs = request.call_args[1] |
| assert request_kwargs["url"] == self.IAM_ENDPOINT_OVERRIDE |
| |
| def test_with_scopes(self): |
| credentials = self.make_credentials() |
| credentials._target_scopes = [] |
| assert credentials.requires_scopes is True |
| credentials = credentials.with_scopes(["fake_scope1", "fake_scope2"]) |
| assert credentials.requires_scopes is False |
| assert credentials._target_scopes == ["fake_scope1", "fake_scope2"] |
| |
| def test_with_trust_boundary(self): |
| credentials = self.make_credentials() |
| new_boundary = {"encodedLocations": "new_boundary"} |
| new_credentials = credentials.with_trust_boundary(new_boundary) |
| |
| assert new_credentials is not credentials |
| assert new_credentials._trust_boundary == new_boundary |
| # The source credentials should be a copy, not the same object. |
| # But they should be functionally equivalent. |
| assert ( |
| new_credentials._source_credentials is not credentials._source_credentials |
| ) |
| |
| assert ( |
| new_credentials._source_credentials.service_account_email |
| == credentials._source_credentials.service_account_email |
| ) |
| assert ( |
| new_credentials._source_credentials._signer |
| == credentials._source_credentials._signer |
| ) |
| assert new_credentials._target_principal == credentials._target_principal |
| |
| def test_build_trust_boundary_lookup_url_no_email(self): |
| credentials = self.make_credentials(target_principal=None) |
| |
| with pytest.raises(ValueError) as excinfo: |
| credentials._build_trust_boundary_lookup_url() |
| |
| assert "Service account email is required" in str(excinfo.value) |
| |
| def test_with_scopes_provide_default_scopes(self): |
| credentials = self.make_credentials() |
| credentials._target_scopes = [] |
| credentials = credentials.with_scopes( |
| ["fake_scope1"], default_scopes=["fake_scope2"] |
| ) |
| assert credentials._target_scopes == ["fake_scope1"] |
| |
| def test_id_token_success( |
| self, mock_donor_credentials, mock_authorizedsession_idtoken |
| ): |
| credentials = self.make_credentials(lifetime=None) |
| token = "token" |
| target_audience = "https://foo.bar" |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| |
| credentials.refresh(request) |
| |
| assert credentials.valid |
| assert not credentials.expired |
| |
| id_creds = impersonated_credentials.IDTokenCredentials( |
| credentials, target_audience=target_audience |
| ) |
| id_creds.refresh(request) |
| |
| assert id_creds.token == ID_TOKEN_DATA |
| expected_expiry = _helpers.utcfromtimestamp(ID_TOKEN_EXPIRY) |
| assert id_creds.expiry == expected_expiry |
| |
| def test_id_token_metrics(self, mock_donor_credentials): |
| credentials = self.make_credentials(lifetime=None) |
| credentials.token = "token" |
| credentials.expiry = None |
| target_audience = "https://foo.bar" |
| |
| id_creds = impersonated_credentials.IDTokenCredentials( |
| credentials, target_audience=target_audience |
| ) |
| |
| with mock.patch( |
| "google.auth.metrics.token_request_id_token_impersonate", |
| return_value=ID_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| ): |
| with mock.patch( |
| "google.auth.transport.requests.AuthorizedSession.post", autospec=True |
| ) as mock_post: |
| data = {"token": ID_TOKEN_DATA} |
| mock_post.return_value = MockResponse(data, http_client.OK) |
| id_creds.refresh(None) |
| |
| assert id_creds.token == ID_TOKEN_DATA |
| expected_expiry = _helpers.utcfromtimestamp(ID_TOKEN_EXPIRY) |
| assert id_creds.expiry == expected_expiry |
| assert ( |
| mock_post.call_args.kwargs["headers"]["x-goog-api-client"] |
| == ID_TOKEN_REQUEST_METRICS_HEADER_VALUE |
| ) |
| |
| def test_id_token_from_credential( |
| self, mock_donor_credentials, mock_authorizedsession_idtoken |
| ): |
| credentials = self.make_credentials(lifetime=None) |
| target_credentials = self.make_credentials(lifetime=None) |
| expected_url = "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]:generateIdToken" |
| self._test_id_token_helper( |
| credentials, |
| target_credentials, |
| mock_donor_credentials, |
| mock_authorizedsession_idtoken, |
| expected_url, |
| ) |
| |
| def test_id_token_from_credential_nonGdu( |
| self, mock_donor_credentials, mock_authorizedsession_idtoken |
| ): |
| source_credentials = service_account.Credentials( |
| SIGNER, "[email protected]", TOKEN_URI, universe_domain="foo.bar" |
| ) |
| credentials = self.make_credentials( |
| lifetime=None, source_credentials=source_credentials |
| ) |
| target_credentials = self.make_credentials( |
| lifetime=None, source_credentials=source_credentials |
| ) |
| expected_url = "https://iamcredentials.foo.bar/v1/projects/-/serviceAccounts/[email protected]:generateIdToken" |
| self._test_id_token_helper( |
| credentials, |
| target_credentials, |
| mock_donor_credentials, |
| mock_authorizedsession_idtoken, |
| expected_url, |
| ) |
| |
| def _test_id_token_helper( |
| self, |
| credentials, |
| target_credentials, |
| mock_donor_credentials, |
| mock_authorizedsession_idtoken, |
| expected_url, |
| ): |
| token = "token" |
| target_audience = "https://foo.bar" |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| |
| credentials.refresh(request) |
| |
| assert credentials.valid |
| assert not credentials.expired |
| |
| id_creds = impersonated_credentials.IDTokenCredentials( |
| credentials, target_audience=target_audience, include_email=True |
| ) |
| id_creds = id_creds.from_credentials(target_credentials=target_credentials) |
| id_creds.refresh(request) |
| |
| args = mock_authorizedsession_idtoken.call_args.args |
| |
| assert args[2] == expected_url |
| |
| assert id_creds.token == ID_TOKEN_DATA |
| assert id_creds._include_email is True |
| assert id_creds._target_credentials is target_credentials |
| |
| def test_id_token_with_target_audience( |
| self, mock_donor_credentials, mock_authorizedsession_idtoken |
| ): |
| credentials = self.make_credentials(lifetime=None) |
| token = "token" |
| target_audience = "https://foo.bar" |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| |
| credentials.refresh(request) |
| |
| assert credentials.valid |
| assert not credentials.expired |
| |
| id_creds = impersonated_credentials.IDTokenCredentials( |
| credentials, include_email=True |
| ) |
| id_creds = id_creds.with_target_audience(target_audience=target_audience) |
| id_creds.refresh(request) |
| |
| assert id_creds.token == ID_TOKEN_DATA |
| expected_expiry = _helpers.utcfromtimestamp(ID_TOKEN_EXPIRY) |
| assert id_creds.expiry == expected_expiry |
| assert id_creds._include_email is True |
| |
| def test_id_token_invalid_cred( |
| self, mock_donor_credentials, mock_authorizedsession_idtoken |
| ): |
| credentials = None |
| |
| with pytest.raises(exceptions.GoogleAuthError) as excinfo: |
| impersonated_credentials.IDTokenCredentials(credentials) |
| |
| assert excinfo.match("Provided Credential must be" " impersonated_credentials") |
| |
| def test_id_token_with_include_email( |
| self, mock_donor_credentials, mock_authorizedsession_idtoken |
| ): |
| credentials = self.make_credentials(lifetime=None) |
| token = "token" |
| target_audience = "https://foo.bar" |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| |
| credentials.refresh(request) |
| |
| assert credentials.valid |
| assert not credentials.expired |
| |
| id_creds = impersonated_credentials.IDTokenCredentials( |
| credentials, target_audience=target_audience |
| ) |
| id_creds = id_creds.with_include_email(True) |
| id_creds.refresh(request) |
| |
| assert id_creds.token == ID_TOKEN_DATA |
| |
| def test_id_token_with_quota_project( |
| self, mock_donor_credentials, mock_authorizedsession_idtoken |
| ): |
| credentials = self.make_credentials(lifetime=None) |
| token = "token" |
| target_audience = "https://foo.bar" |
| |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500) |
| ).isoformat("T") + "Z" |
| response_body = {"accessToken": token, "expireTime": expire_time} |
| |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| |
| credentials.refresh(request) |
| |
| assert credentials.valid |
| assert not credentials.expired |
| |
| id_creds = impersonated_credentials.IDTokenCredentials( |
| credentials, target_audience=target_audience |
| ) |
| id_creds = id_creds.with_quota_project("project-foo") |
| id_creds.refresh(request) |
| |
| assert id_creds.quota_project_id == "project-foo" |
| |
| def test_sign_jwt_request_success(self): |
| principal = "[email protected]" |
| expected_signed_jwt = "correct_signed_jwt" |
| |
| response_body = {"keyId": "1", "signedJwt": expected_signed_jwt} |
| request = self.make_request( |
| data=json.dumps(response_body), status=http_client.OK |
| ) |
| |
| signed_jwt = impersonated_credentials._sign_jwt_request( |
| request=request, principal=principal, headers={}, payload={} |
| ) |
| |
| assert signed_jwt == expected_signed_jwt |
| request.assert_called_once_with( |
| url="https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/[email protected]:signJwt", |
| method="POST", |
| headers={}, |
| body=json.dumps({"delegates": [], "payload": json.dumps({})}).encode( |
| "utf-8" |
| ), |
| ) |
| |
| def test_sign_jwt_request_http_error(self): |
| principal = "[email protected]" |
| |
| request = self.make_request( |
| data="error_message", status=http_client.BAD_REQUEST |
| ) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| _ = impersonated_credentials._sign_jwt_request( |
| request=request, principal=principal, headers={}, payload={} |
| ) |
| |
| assert excinfo.match(impersonated_credentials._REFRESH_ERROR) |
| |
| assert excinfo.value.args[0] == "Unable to acquire impersonated credentials" |
| assert excinfo.value.args[1] == "error_message" |
| |
| def test_sign_jwt_request_invalid_response_error(self): |
| principal = "[email protected]" |
| |
| request = self.make_request(data="invalid_data", status=http_client.OK) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| _ = impersonated_credentials._sign_jwt_request( |
| request=request, principal=principal, headers={}, payload={} |
| ) |
| |
| assert excinfo.match(impersonated_credentials._REFRESH_ERROR) |
| |
| assert ( |
| excinfo.value.args[0] |
| == "Unable to acquire impersonated credentials: No signed JWT in response." |
| ) |
| assert excinfo.value.args[1] == "invalid_data" |