| # Copyright 2020 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import datetime |
| import http.client as http_client |
| import json |
| import os |
| from unittest import mock |
| import urllib |
| |
| import pytest # type: ignore |
| |
| from google.auth import _helpers |
| from google.auth import environment_vars |
| from google.auth import exceptions |
| from google.auth import external_account |
| from google.auth import transport |
| from google.auth.credentials import DEFAULT_UNIVERSE_DOMAIN |
| from google.auth.credentials import TokenState |
| |
| IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE = ( |
| "gl-python/3.7 auth/1.1 auth-request-type/at cred-type/imp" |
| ) |
| LANG_LIBRARY_METRICS_HEADER_VALUE = "gl-python/3.7 auth/1.1" |
| |
| CLIENT_ID = "username" |
| CLIENT_SECRET = "password" |
| # Base64 encoding of "username:password" |
| BASIC_AUTH_ENCODING = "dXNlcm5hbWU6cGFzc3dvcmQ=" |
| SERVICE_ACCOUNT_EMAIL = "[email protected]" |
| # List of valid workforce pool audiences. |
| TEST_USER_AUDIENCES = [ |
| "//iam.googleapis.com/locations/global/workforcePools/pool-id/providers/provider-id", |
| "//iam.googleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id", |
| "//iam.googleapis.com/locations/eu/workforcePools/workloadIdentityPools/providers/provider-id", |
| ] |
| # Workload identity pool audiences or invalid workforce pool audiences. |
| TEST_NON_USER_AUDIENCES = [ |
| # Legacy K8s audience format. |
| "identitynamespace:1f12345:my_provider", |
| ( |
| "//iam.googleapis.com/projects/123456/locations/" |
| "global/workloadIdentityPools/pool-id/providers/" |
| "provider-id" |
| ), |
| ( |
| "//iam.googleapis.com/projects/123456/locations/" |
| "eu/workloadIdentityPools/pool-id/providers/" |
| "provider-id" |
| ), |
| # Pool ID with workforcePools string. |
| ( |
| "//iam.googleapis.com/projects/123456/locations/" |
| "global/workloadIdentityPools/workforcePools/providers/" |
| "provider-id" |
| ), |
| # Unrealistic / incorrect workforce pool audiences. |
| "//iamgoogleapis.com/locations/eu/workforcePools/pool-id/providers/provider-id", |
| "//iam.googleapiscom/locations/eu/workforcePools/pool-id/providers/provider-id", |
| "//iam.googleapis.com/locations/workforcePools/pool-id/providers/provider-id", |
| "//iam.googleapis.com/locations/eu/workforcePool/pool-id/providers/provider-id", |
| "//iam.googleapis.com/locations//workforcePool/pool-id/providers/provider-id", |
| ] |
| |
| |
| class CredentialsImpl(external_account.Credentials): |
| def __init__(self, **kwargs): |
| super(CredentialsImpl, self).__init__(**kwargs) |
| self._counter = 0 |
| |
| def retrieve_subject_token(self, request): |
| counter = self._counter |
| self._counter += 1 |
| return "subject_token_{}".format(counter) |
| |
| |
| class TestCredentials(object): |
| TOKEN_URL = "https://sts.googleapis.com/v1/token" |
| TOKEN_INFO_URL = "https://sts.googleapis.com/v1/introspect" |
| PROJECT_NUMBER = "123456" |
| POOL_ID = "POOL_ID" |
| PROVIDER_ID = "PROVIDER_ID" |
| AUDIENCE = ( |
| "//iam.googleapis.com/projects/{}" |
| "/locations/global/workloadIdentityPools/{}" |
| "/providers/{}" |
| ).format(PROJECT_NUMBER, POOL_ID, PROVIDER_ID) |
| WORKFORCE_AUDIENCE = ( |
| "//iam.googleapis.com/locations/global/workforcePools/{}/providers/{}" |
| ).format(POOL_ID, PROVIDER_ID) |
| WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER" |
| SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:jwt" |
| WORKFORCE_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:id_token" |
| CREDENTIAL_SOURCE = {"file": "/var/run/secrets/goog.id/token"} |
| SUCCESS_RESPONSE = { |
| "access_token": "ACCESS_TOKEN", |
| "issued_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "token_type": "Bearer", |
| "expires_in": 3600, |
| "scope": "scope1 scope2", |
| } |
| ERROR_RESPONSE = { |
| "error": "invalid_request", |
| "error_description": "Invalid subject token", |
| "error_uri": "https://tools.ietf.org/html/rfc6749", |
| } |
| QUOTA_PROJECT_ID = "QUOTA_PROJECT_ID" |
| SERVICE_ACCOUNT_IMPERSONATION_URL = ( |
| "https://us-east1-iamcredentials.googleapis.com/v1/projects/-" |
| + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL) |
| ) |
| SCOPES = ["scope1", "scope2"] |
| IMPERSONATION_ERROR_RESPONSE = { |
| "error": { |
| "code": 400, |
| "message": "Request contains an invalid argument", |
| "status": "INVALID_ARGUMENT", |
| } |
| } |
| NO_OP_TRUST_BOUNDARY = {"locations": [], "encodedLocations": "0x0"} |
| VALID_TRUST_BOUNDARY = { |
| "locations": ["us-central1", "us-east1"], |
| "encodedLocations": "0xVALIDHEXSA", |
| } |
| PROJECT_ID = "my-proj-id" |
| CLOUD_RESOURCE_MANAGER_URL = ( |
| "https://cloudresourcemanager.googleapis.com/v1/projects/" |
| ) |
| CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE = { |
| "projectNumber": PROJECT_NUMBER, |
| "projectId": PROJECT_ID, |
| "lifecycleState": "ACTIVE", |
| "name": "project-name", |
| "createTime": "2018-11-06T04:42:54.109Z", |
| "parent": {"type": "folder", "id": "12345678901"}, |
| } |
| |
| @classmethod |
| def make_credentials( |
| cls, |
| client_id=None, |
| client_secret=None, |
| quota_project_id=None, |
| token_info_url=None, |
| scopes=None, |
| default_scopes=None, |
| service_account_impersonation_url=None, |
| service_account_impersonation_options={}, |
| universe_domain=DEFAULT_UNIVERSE_DOMAIN, |
| trust_boundary=None, |
| ): |
| return CredentialsImpl( |
| audience=cls.AUDIENCE, |
| subject_token_type=cls.SUBJECT_TOKEN_TYPE, |
| token_url=cls.TOKEN_URL, |
| token_info_url=token_info_url, |
| service_account_impersonation_url=service_account_impersonation_url, |
| service_account_impersonation_options=service_account_impersonation_options, |
| credential_source=cls.CREDENTIAL_SOURCE, |
| client_id=client_id, |
| client_secret=client_secret, |
| quota_project_id=quota_project_id, |
| scopes=scopes, |
| default_scopes=default_scopes, |
| universe_domain=universe_domain, |
| trust_boundary=trust_boundary, |
| ) |
| |
| @classmethod |
| def make_workforce_pool_credentials( |
| cls, |
| client_id=None, |
| client_secret=None, |
| quota_project_id=None, |
| scopes=None, |
| default_scopes=None, |
| service_account_impersonation_url=None, |
| workforce_pool_user_project=None, |
| trust_boundary=None, |
| ): |
| return CredentialsImpl( |
| audience=cls.WORKFORCE_AUDIENCE, |
| subject_token_type=cls.WORKFORCE_SUBJECT_TOKEN_TYPE, |
| token_url=cls.TOKEN_URL, |
| service_account_impersonation_url=service_account_impersonation_url, |
| credential_source=cls.CREDENTIAL_SOURCE, |
| client_id=client_id, |
| client_secret=client_secret, |
| quota_project_id=quota_project_id, |
| scopes=scopes, |
| default_scopes=default_scopes, |
| workforce_pool_user_project=workforce_pool_user_project, |
| trust_boundary=trust_boundary, |
| ) |
| |
| @classmethod |
| def make_mock_request( |
| cls, |
| status=http_client.OK, |
| data=None, |
| impersonation_status=None, |
| impersonation_data=None, |
| cloud_resource_manager_status=None, |
| cloud_resource_manager_data=None, |
| ): |
| # STS token exchange request. |
| token_response = mock.create_autospec(transport.Response, instance=True) |
| token_response.status = status |
| token_response.data = json.dumps(data).encode("utf-8") |
| responses = [token_response] |
| |
| # If service account impersonation is requested, mock the expected response. |
| if impersonation_status: |
| impersonation_response = mock.create_autospec( |
| transport.Response, instance=True |
| ) |
| impersonation_response.status = impersonation_status |
| impersonation_response.data = json.dumps(impersonation_data).encode("utf-8") |
| responses.append(impersonation_response) |
| |
| # If cloud resource manager is requested, mock the expected response. |
| if cloud_resource_manager_status: |
| cloud_resource_manager_response = mock.create_autospec( |
| transport.Response, instance=True |
| ) |
| cloud_resource_manager_response.status = cloud_resource_manager_status |
| cloud_resource_manager_response.data = json.dumps( |
| cloud_resource_manager_data |
| ).encode("utf-8") |
| responses.append(cloud_resource_manager_response) |
| |
| request = mock.create_autospec(transport.Request) |
| request.side_effect = responses |
| |
| return request |
| |
| @classmethod |
| def assert_token_request_kwargs( |
| cls, request_kwargs, headers, request_data, cert=None |
| ): |
| assert request_kwargs["url"] == cls.TOKEN_URL |
| assert request_kwargs["method"] == "POST" |
| assert request_kwargs["headers"] == headers |
| if cert is not None: |
| assert request_kwargs["cert"] == cert |
| else: |
| assert "cert" not in request_kwargs |
| assert request_kwargs["body"] is not None |
| body_tuples = urllib.parse.parse_qsl(request_kwargs["body"]) |
| for k, v in body_tuples: |
| assert v.decode("utf-8") == request_data[k.decode("utf-8")] |
| assert len(body_tuples) == len(request_data.keys()) |
| |
| @classmethod |
| def assert_impersonation_request_kwargs( |
| cls, request_kwargs, headers, request_data, cert=None |
| ): |
| assert request_kwargs["url"] == cls.SERVICE_ACCOUNT_IMPERSONATION_URL |
| assert request_kwargs["method"] == "POST" |
| assert request_kwargs["headers"] == headers |
| if cert is not None: |
| assert request_kwargs["cert"] == cert |
| else: |
| assert "cert" not in request_kwargs |
| assert request_kwargs["body"] is not None |
| body_json = json.loads(request_kwargs["body"].decode("utf-8")) |
| assert body_json == request_data |
| |
| @classmethod |
| def assert_resource_manager_request_kwargs( |
| cls, request_kwargs, project_number, headers |
| ): |
| assert request_kwargs["url"] == cls.CLOUD_RESOURCE_MANAGER_URL + project_number |
| assert request_kwargs["method"] == "GET" |
| assert request_kwargs["headers"] == headers |
| assert "body" not in request_kwargs |
| |
| def test_get_cred_info(self): |
| credentials = self.make_credentials() |
| assert not credentials.get_cred_info() |
| |
| credentials._cred_file_path = "/path/to/file" |
| assert credentials.get_cred_info() == { |
| "credential_source": "/path/to/file", |
| "credential_type": "external account credentials", |
| } |
| |
| credentials._service_account_impersonation_url = ( |
| self.SERVICE_ACCOUNT_IMPERSONATION_URL |
| ) |
| assert credentials.get_cred_info() == { |
| "credential_source": "/path/to/file", |
| "credential_type": "external account credentials", |
| "principal": SERVICE_ACCOUNT_EMAIL, |
| } |
| |
| def test__make_copy_get_cred_info(self): |
| credentials = self.make_credentials() |
| credentials._cred_file_path = "/path/to/file" |
| cred_copy = credentials._make_copy() |
| assert cred_copy._cred_file_path == "/path/to/file" |
| |
| def test_default_state(self): |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL |
| ) |
| |
| # Token url and service account impersonation url should be set |
| assert credentials._token_url |
| assert credentials._service_account_impersonation_url |
| # Not token acquired yet |
| assert not credentials.token |
| assert not credentials.valid |
| # Expiration hasn't been set yet |
| assert not credentials.expiry |
| assert not credentials.expired |
| # Scopes are required |
| assert not credentials.scopes |
| assert credentials.requires_scopes |
| assert not credentials.quota_project_id |
| # Token info url not set yet |
| assert not credentials.token_info_url |
| |
| def test_nonworkforce_with_workforce_pool_user_project(self): |
| with pytest.raises(exceptions.InvalidValue) as excinfo: |
| CredentialsImpl( |
| audience=self.AUDIENCE, |
| subject_token_type=self.SUBJECT_TOKEN_TYPE, |
| token_url=self.TOKEN_URL, |
| credential_source=self.CREDENTIAL_SOURCE, |
| workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT, |
| ) |
| |
| assert excinfo.match( |
| "workforce_pool_user_project should not be set for non-workforce " |
| "pool credentials" |
| ) |
| |
| def test_with_scopes(self): |
| credentials = self.make_credentials() |
| |
| assert not credentials.scopes |
| assert credentials.requires_scopes |
| |
| scoped_credentials = credentials.with_scopes(["email"]) |
| |
| assert scoped_credentials.has_scopes(["email"]) |
| assert not scoped_credentials.requires_scopes |
| |
| def test_with_scopes_workforce_pool(self): |
| credentials = self.make_workforce_pool_credentials( |
| workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT |
| ) |
| |
| assert not credentials.scopes |
| assert credentials.requires_scopes |
| |
| scoped_credentials = credentials.with_scopes(["email"]) |
| |
| assert scoped_credentials.has_scopes(["email"]) |
| assert not scoped_credentials.requires_scopes |
| assert ( |
| scoped_credentials.info.get("workforce_pool_user_project") |
| == self.WORKFORCE_POOL_USER_PROJECT |
| ) |
| |
| def test_with_scopes_using_user_and_default_scopes(self): |
| credentials = self.make_credentials() |
| |
| assert not credentials.scopes |
| assert credentials.requires_scopes |
| |
| scoped_credentials = credentials.with_scopes( |
| ["email"], default_scopes=["profile"] |
| ) |
| |
| assert scoped_credentials.has_scopes(["email"]) |
| assert not scoped_credentials.has_scopes(["profile"]) |
| assert not scoped_credentials.requires_scopes |
| assert scoped_credentials.scopes == ["email"] |
| assert scoped_credentials.default_scopes == ["profile"] |
| |
| def test_with_scopes_using_default_scopes_only(self): |
| credentials = self.make_credentials() |
| |
| assert not credentials.scopes |
| assert credentials.requires_scopes |
| |
| scoped_credentials = credentials.with_scopes(None, default_scopes=["profile"]) |
| |
| assert scoped_credentials.has_scopes(["profile"]) |
| assert not scoped_credentials.requires_scopes |
| |
| def test_with_scopes_full_options_propagated(self): |
| credentials = self.make_credentials( |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| quota_project_id=self.QUOTA_PROJECT_ID, |
| scopes=self.SCOPES, |
| token_info_url=self.TOKEN_INFO_URL, |
| default_scopes=["default1"], |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| service_account_impersonation_options={"token_lifetime_seconds": 2800}, |
| ) |
| |
| with mock.patch.object( |
| external_account.Credentials, "__init__", return_value=None |
| ) as mock_init: |
| credentials.with_scopes(["email"], ["default2"]) |
| |
| # Confirm with_scopes initialized the credential with the expected |
| # parameters and scopes. |
| mock_init.assert_called_once_with( |
| audience=self.AUDIENCE, |
| subject_token_type=self.SUBJECT_TOKEN_TYPE, |
| token_url=self.TOKEN_URL, |
| token_info_url=self.TOKEN_INFO_URL, |
| credential_source=self.CREDENTIAL_SOURCE, |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| service_account_impersonation_options={"token_lifetime_seconds": 2800}, |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| quota_project_id=self.QUOTA_PROJECT_ID, |
| scopes=["email"], |
| default_scopes=["default2"], |
| universe_domain=DEFAULT_UNIVERSE_DOMAIN, |
| trust_boundary=None, |
| ) |
| |
| def test_with_token_uri(self): |
| credentials = self.make_credentials() |
| new_token_uri = "https://eu-sts.googleapis.com/v1/token" |
| |
| assert credentials._token_url == self.TOKEN_URL |
| |
| creds_with_new_token_uri = credentials.with_token_uri(new_token_uri) |
| |
| assert creds_with_new_token_uri._token_url == new_token_uri |
| |
| def test_with_token_uri_workforce_pool(self): |
| credentials = self.make_workforce_pool_credentials( |
| workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT |
| ) |
| |
| new_token_uri = "https://eu-sts.googleapis.com/v1/token" |
| |
| assert credentials._token_url == self.TOKEN_URL |
| |
| creds_with_new_token_uri = credentials.with_token_uri(new_token_uri) |
| |
| assert creds_with_new_token_uri._token_url == new_token_uri |
| assert ( |
| creds_with_new_token_uri.info.get("workforce_pool_user_project") |
| == self.WORKFORCE_POOL_USER_PROJECT |
| ) |
| |
| def test_with_quota_project(self): |
| credentials = self.make_credentials() |
| |
| assert not credentials.scopes |
| assert not credentials.quota_project_id |
| |
| quota_project_creds = credentials.with_quota_project("project-foo") |
| |
| assert quota_project_creds.quota_project_id == "project-foo" |
| |
| def test_with_quota_project_workforce_pool(self): |
| credentials = self.make_workforce_pool_credentials( |
| workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT |
| ) |
| |
| assert not credentials.scopes |
| assert not credentials.quota_project_id |
| |
| quota_project_creds = credentials.with_quota_project("project-foo") |
| |
| assert quota_project_creds.quota_project_id == "project-foo" |
| assert ( |
| quota_project_creds.info.get("workforce_pool_user_project") |
| == self.WORKFORCE_POOL_USER_PROJECT |
| ) |
| |
| def test_with_quota_project_full_options_propagated(self): |
| credentials = self.make_credentials( |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| token_info_url=self.TOKEN_INFO_URL, |
| quota_project_id=self.QUOTA_PROJECT_ID, |
| scopes=self.SCOPES, |
| default_scopes=["default1"], |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| service_account_impersonation_options={"token_lifetime_seconds": 2800}, |
| ) |
| |
| with mock.patch.object( |
| external_account.Credentials, "__init__", return_value=None |
| ) as mock_init: |
| new_cred = credentials.with_quota_project("project-foo") |
| |
| # Confirm with_quota_project initialized the credential with the |
| # expected parameters. |
| mock_init.assert_called_once_with( |
| audience=self.AUDIENCE, |
| subject_token_type=self.SUBJECT_TOKEN_TYPE, |
| token_url=self.TOKEN_URL, |
| token_info_url=self.TOKEN_INFO_URL, |
| credential_source=self.CREDENTIAL_SOURCE, |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| service_account_impersonation_options={"token_lifetime_seconds": 2800}, |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| quota_project_id=self.QUOTA_PROJECT_ID, |
| scopes=self.SCOPES, |
| default_scopes=["default1"], |
| universe_domain=DEFAULT_UNIVERSE_DOMAIN, |
| trust_boundary=None, |
| ) |
| |
| # Confirm with_quota_project sets the correct quota project after |
| # initialization. |
| assert new_cred.quota_project_id == "project-foo" |
| |
| def test_info(self): |
| credentials = self.make_credentials(universe_domain="dummy_universe.com") |
| |
| assert credentials.info == { |
| "type": "external_account", |
| "audience": self.AUDIENCE, |
| "subject_token_type": self.SUBJECT_TOKEN_TYPE, |
| "token_url": self.TOKEN_URL, |
| "credential_source": self.CREDENTIAL_SOURCE.copy(), |
| "universe_domain": "dummy_universe.com", |
| } |
| |
| def test_universe_domain(self): |
| credentials = self.make_credentials(universe_domain="dummy_universe.com") |
| assert credentials.universe_domain == "dummy_universe.com" |
| |
| credentials = self.make_credentials() |
| assert credentials.universe_domain == DEFAULT_UNIVERSE_DOMAIN |
| |
| def test_with_universe_domain(self): |
| credentials = self.make_credentials() |
| new_credentials = credentials.with_universe_domain("dummy_universe.com") |
| assert new_credentials.universe_domain == "dummy_universe.com" |
| |
| def test_info_workforce_pool(self): |
| credentials = self.make_workforce_pool_credentials( |
| workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT |
| ) |
| |
| assert credentials.info == { |
| "type": "external_account", |
| "audience": self.WORKFORCE_AUDIENCE, |
| "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE, |
| "token_url": self.TOKEN_URL, |
| "credential_source": self.CREDENTIAL_SOURCE.copy(), |
| "workforce_pool_user_project": self.WORKFORCE_POOL_USER_PROJECT, |
| "universe_domain": DEFAULT_UNIVERSE_DOMAIN, |
| } |
| |
| def test_info_with_full_options(self): |
| credentials = self.make_credentials( |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| quota_project_id=self.QUOTA_PROJECT_ID, |
| token_info_url=self.TOKEN_INFO_URL, |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| service_account_impersonation_options={"token_lifetime_seconds": 2800}, |
| ) |
| |
| assert credentials.info == { |
| "type": "external_account", |
| "audience": self.AUDIENCE, |
| "subject_token_type": self.SUBJECT_TOKEN_TYPE, |
| "token_url": self.TOKEN_URL, |
| "token_info_url": self.TOKEN_INFO_URL, |
| "service_account_impersonation_url": self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| "service_account_impersonation": {"token_lifetime_seconds": 2800}, |
| "credential_source": self.CREDENTIAL_SOURCE.copy(), |
| "quota_project_id": self.QUOTA_PROJECT_ID, |
| "client_id": CLIENT_ID, |
| "client_secret": CLIENT_SECRET, |
| "universe_domain": DEFAULT_UNIVERSE_DOMAIN, |
| } |
| |
| def test_service_account_email_without_impersonation(self): |
| credentials = self.make_credentials() |
| |
| assert credentials.service_account_email is None |
| |
| def test_service_account_email_with_impersonation(self): |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL |
| ) |
| |
| assert credentials.service_account_email == SERVICE_ACCOUNT_EMAIL |
| |
| @pytest.mark.parametrize("audience", TEST_NON_USER_AUDIENCES) |
| def test_is_user_with_non_users(self, audience): |
| credentials = CredentialsImpl( |
| audience=audience, |
| subject_token_type=self.SUBJECT_TOKEN_TYPE, |
| token_url=self.TOKEN_URL, |
| credential_source=self.CREDENTIAL_SOURCE, |
| ) |
| |
| assert credentials.is_user is False |
| |
| @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES) |
| def test_is_user_with_users(self, audience): |
| credentials = CredentialsImpl( |
| audience=audience, |
| subject_token_type=self.SUBJECT_TOKEN_TYPE, |
| token_url=self.TOKEN_URL, |
| credential_source=self.CREDENTIAL_SOURCE, |
| ) |
| |
| assert credentials.is_user is True |
| |
| @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES) |
| def test_is_user_with_users_and_impersonation(self, audience): |
| # Initialize the credentials with service account impersonation. |
| credentials = CredentialsImpl( |
| audience=audience, |
| subject_token_type=self.SUBJECT_TOKEN_TYPE, |
| token_url=self.TOKEN_URL, |
| credential_source=self.CREDENTIAL_SOURCE, |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| ) |
| |
| # Even though the audience is for a workforce pool, since service account |
| # impersonation is used, the credentials will represent a service account and |
| # not a user. |
| assert credentials.is_user is False |
| |
| @pytest.mark.parametrize("audience", TEST_NON_USER_AUDIENCES) |
| def test_is_workforce_pool_with_non_users(self, audience): |
| credentials = CredentialsImpl( |
| audience=audience, |
| subject_token_type=self.SUBJECT_TOKEN_TYPE, |
| token_url=self.TOKEN_URL, |
| credential_source=self.CREDENTIAL_SOURCE, |
| ) |
| |
| assert credentials.is_workforce_pool is False |
| |
| @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES) |
| def test_is_workforce_pool_with_users(self, audience): |
| credentials = CredentialsImpl( |
| audience=audience, |
| subject_token_type=self.SUBJECT_TOKEN_TYPE, |
| token_url=self.TOKEN_URL, |
| credential_source=self.CREDENTIAL_SOURCE, |
| ) |
| |
| assert credentials.is_workforce_pool is True |
| |
| @pytest.mark.parametrize("audience", TEST_USER_AUDIENCES) |
| def test_is_workforce_pool_with_users_and_impersonation(self, audience): |
| # Initialize the credentials with workforce audience and service account |
| # impersonation. |
| credentials = CredentialsImpl( |
| audience=audience, |
| subject_token_type=self.SUBJECT_TOKEN_TYPE, |
| token_url=self.TOKEN_URL, |
| credential_source=self.CREDENTIAL_SOURCE, |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| ) |
| |
| # Even though impersonation is used, is_workforce_pool should still return True. |
| assert credentials.is_workforce_pool is True |
| |
| @pytest.mark.parametrize("mock_expires_in", [2800, "2800"]) |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) |
| def test_refresh_without_client_auth_success( |
| self, unused_utcnow, mock_auth_lib_value, mock_expires_in |
| ): |
| response = self.SUCCESS_RESPONSE.copy() |
| # Test custom expiration to confirm expiry is set correctly. |
| response["expires_in"] = mock_expires_in |
| expected_expiry = datetime.datetime.min + datetime.timedelta( |
| seconds=int(mock_expires_in) |
| ) |
| headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false", |
| } |
| request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.SUBJECT_TOKEN_TYPE, |
| } |
| request = self.make_mock_request(status=http_client.OK, data=response) |
| credentials = self.make_credentials() |
| |
| credentials.refresh(request) |
| |
| self.assert_token_request_kwargs(request.call_args[1], headers, request_data) |
| assert credentials.valid |
| assert credentials.expiry == expected_expiry |
| assert not credentials.expired |
| assert credentials.token == response["access_token"] |
| |
| @mock.patch("google.auth.external_account.Credentials._lookup_trust_boundary") |
| def test_refresh_skips_trust_boundary_lookup_when_disabled( |
| self, mock_lookup_trust_boundary |
| ): |
| credentials = self.make_credentials() |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| |
| credentials.refresh(request) |
| |
| assert credentials.valid |
| assert credentials.token == self.SUCCESS_RESPONSE["access_token"] |
| mock_lookup_trust_boundary.assert_not_called() |
| headers_applied = {} |
| credentials.apply(headers_applied) |
| assert "x-allowed-locations" not in headers_applied |
| |
| def test_perform_refresh_token_with_cert_fingerprint(self): |
| credentials = self.make_credentials() |
| credentials._sts_client = mock.MagicMock() |
| credentials._sts_client.exchange_token.return_value = { |
| "access_token": "token", |
| "expires_in": 3600, |
| } |
| credentials.retrieve_subject_token = mock.MagicMock( |
| return_value="subject_token" |
| ) |
| |
| credentials._perform_refresh_token( |
| request=mock.sentinel.request, cert_fingerprint="my-fingerprint" |
| ) |
| |
| _, kwargs = credentials._sts_client.exchange_token.call_args |
| assert kwargs["additional_options"]["bindCertFingerprint"] == "my-fingerprint" |
| |
| def test_refresh_skips_sending_allowed_locations_header_with_trust_boundary(self): |
| # This test verifies that the x-allowed-locations header is not sent with |
| # the STS request even if a trust boundary is cached. |
| trust_boundary_value = {"encodedLocations": "0x12345"} |
| headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false", |
| } |
| request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.SUBJECT_TOKEN_TYPE, |
| } |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_credentials() |
| # Set a cached trust boundary. |
| credentials._trust_boundary = trust_boundary_value |
| |
| with mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ): |
| credentials.refresh(request) |
| |
| self.assert_token_request_kwargs(request.call_args[1], headers, request_data) |
| |
| def test_refresh_on_impersonated_credential_skips_parent_trust_boundary_lookup( |
| self, |
| ): |
| # This test verifies that the top-level impersonating credential |
| # does not perform a trust boundary lookup. |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=self.SUCCESS_RESPONSE, |
| impersonation_status=http_client.OK, |
| impersonation_data={ |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": "2025-01-01T00:00:00Z", |
| }, |
| ) |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL |
| ) |
| |
| with mock.patch.object( |
| credentials, "_refresh_trust_boundary", autospec=True |
| ) as mock_refresh_trust_boundary: |
| credentials.refresh(request) |
| |
| mock_refresh_trust_boundary.assert_not_called() |
| |
| def test_refresh_fetches_no_op_trust_boundary(self): |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_credentials() |
| |
| with mock.patch.object( |
| credentials, |
| "_lookup_trust_boundary", |
| return_value=self.NO_OP_TRUST_BOUNDARY, |
| ) as mock_lookup, mock.patch.dict( |
| os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} |
| ): |
| credentials.refresh(request) |
| |
| mock_lookup.assert_called_once() |
| headers = {} |
| credentials.apply(headers) |
| assert headers["x-allowed-locations"] == "" |
| |
| def test_refresh_skips_lookup_with_cached_no_op_boundary(self): |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_credentials() |
| credentials._trust_boundary = self.NO_OP_TRUST_BOUNDARY |
| |
| with mock.patch.object( |
| credentials, "_lookup_trust_boundary" |
| ) as mock_lookup, mock.patch.dict( |
| os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} |
| ): |
| credentials.refresh(request) |
| |
| mock_lookup.assert_not_called() |
| headers = {} |
| credentials.apply(headers) |
| assert headers["x-allowed-locations"] == "" |
| |
| def test_refresh_fails_on_lookup_failure_with_no_cache(self): |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_credentials() |
| |
| with mock.patch.object( |
| credentials, |
| "_lookup_trust_boundary", |
| side_effect=exceptions.RefreshError("Lookup failed"), |
| ) as mock_lookup, mock.patch.dict( |
| os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} |
| ), pytest.raises( |
| exceptions.RefreshError, match="Lookup failed" |
| ): |
| credentials.refresh(request) |
| |
| mock_lookup.assert_called_once() |
| |
| def test_refresh_uses_cached_boundary_on_lookup_failure(self): |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_credentials() |
| credentials._trust_boundary = {"encodedLocations": "0x123"} |
| |
| with mock.patch.object( |
| credentials, |
| "_lookup_trust_boundary", |
| side_effect=exceptions.RefreshError("Lookup failed"), |
| ) as mock_lookup, mock.patch.dict( |
| os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} |
| ): |
| credentials.refresh(request) |
| |
| mock_lookup.assert_called_once() |
| headers = {} |
| credentials.apply(headers) |
| assert headers["x-allowed-locations"] == "0x123" |
| |
| def test_refresh_propagates_trust_boundary_to_impersonated_credential(self): |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| trust_boundary=self.VALID_TRUST_BOUNDARY, |
| ) |
| impersonated_creds_mock = mock.Mock() |
| impersonated_creds_mock._trust_boundary = self.VALID_TRUST_BOUNDARY |
| |
| with mock.patch( |
| "google.auth.external_account.impersonated_credentials.Credentials", |
| return_value=impersonated_creds_mock, |
| ) as mock_impersonated_creds, mock.patch.dict( |
| os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} |
| ): |
| credentials.refresh(request) |
| |
| mock_impersonated_creds.assert_called_once_with( |
| source_credentials=mock.ANY, |
| target_principal=mock.ANY, |
| target_scopes=mock.ANY, |
| quota_project_id=mock.ANY, |
| iam_endpoint_override=mock.ANY, |
| lifetime=mock.ANY, |
| trust_boundary=self.VALID_TRUST_BOUNDARY, |
| ) |
| assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY |
| |
| def test_build_trust_boundary_lookup_url_workload(self): |
| credentials = self.make_credentials() |
| expected_url = "https://iamcredentials.googleapis.com/v1/projects/123456/locations/global/workloadIdentityPools/POOL_ID/allowedLocations" |
| assert credentials._build_trust_boundary_lookup_url() == expected_url |
| |
| def test_build_trust_boundary_lookup_url_workforce(self): |
| credentials = self.make_workforce_pool_credentials() |
| expected_url = "https://iamcredentials.googleapis.com/v1/locations/global/workforcePools/POOL_ID/allowedLocations" |
| assert credentials._build_trust_boundary_lookup_url() == expected_url |
| |
| @pytest.mark.parametrize( |
| "audience", |
| [ |
| "invalid", |
| "//iam.googleapis.com/projects/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID", |
| "//iam.googleapis.com/locations/global/workforcsePools//providers/provider-id", |
| ], |
| ) |
| def test_build_trust_boundary_lookup_url_invalid_audience(self, audience): |
| credentials = self.make_credentials() |
| credentials._audience = audience |
| with pytest.raises(exceptions.InvalidValue, match="Invalid audience format."): |
| credentials._build_trust_boundary_lookup_url() |
| |
| def test_refresh_fetches_trust_boundary_workload(self): |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_credentials() |
| |
| with mock.patch.object( |
| credentials, |
| "_lookup_trust_boundary", |
| return_value=self.VALID_TRUST_BOUNDARY, |
| ) as mock_lookup, mock.patch.dict( |
| os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} |
| ): |
| credentials.refresh(request) |
| |
| mock_lookup.assert_called_once() |
| headers = {} |
| credentials.apply(headers) |
| assert ( |
| headers["x-allowed-locations"] |
| == self.VALID_TRUST_BOUNDARY["encodedLocations"] |
| ) |
| |
| def test_refresh_fetches_trust_boundary_workforce(self): |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_workforce_pool_credentials() |
| |
| with mock.patch.object( |
| credentials, |
| "_lookup_trust_boundary", |
| return_value=self.VALID_TRUST_BOUNDARY, |
| ) as mock_lookup, mock.patch.dict( |
| os.environ, {environment_vars.GOOGLE_AUTH_TRUST_BOUNDARY_ENABLED: "true"} |
| ): |
| credentials.refresh(request) |
| |
| mock_lookup.assert_called_once() |
| headers = {} |
| credentials.apply(headers) |
| assert ( |
| headers["x-allowed-locations"] |
| == self.VALID_TRUST_BOUNDARY["encodedLocations"] |
| ) |
| |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) |
| @mock.patch( |
| "google.auth.external_account.Credentials._mtls_required", return_value=True |
| ) |
| @mock.patch( |
| "google.auth.external_account.Credentials._get_mtls_cert_and_key_paths", |
| return_value=("path/to/cert.pem", "path/to/key.pem"), |
| ) |
| def test_refresh_with_mtls( |
| self, |
| mock_get_mtls_cert_and_key_paths, |
| mock_mtls_required, |
| unused_utcnow, |
| mock_auth_lib_value, |
| ): |
| response = self.SUCCESS_RESPONSE.copy() |
| # Test custom expiration to confirm expiry is set correctly. |
| response["expires_in"] = 2800 |
| expected_expiry = datetime.datetime.min + datetime.timedelta( |
| seconds=response["expires_in"] |
| ) |
| headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false", |
| } |
| request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.SUBJECT_TOKEN_TYPE, |
| } |
| request = self.make_mock_request(status=http_client.OK, data=response) |
| credentials = self.make_credentials() |
| |
| credentials.refresh(request) |
| |
| expected_cert_path = ("path/to/cert.pem", "path/to/key.pem") |
| self.assert_token_request_kwargs( |
| request.call_args[1], headers, request_data, expected_cert_path |
| ) |
| assert credentials.valid |
| assert credentials.expiry == expected_expiry |
| assert not credentials.expired |
| assert credentials.token == response["access_token"] |
| |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) |
| def test_refresh_workforce_without_client_auth_success( |
| self, unused_utcnow, test_auth_lib_value |
| ): |
| response = self.SUCCESS_RESPONSE.copy() |
| # Test custom expiration to confirm expiry is set correctly. |
| response["expires_in"] = 2800 |
| expected_expiry = datetime.datetime.min + datetime.timedelta( |
| seconds=response["expires_in"] |
| ) |
| headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false", |
| } |
| request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.WORKFORCE_AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE, |
| "options": urllib.parse.quote( |
| json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT}) |
| ), |
| } |
| request = self.make_mock_request(status=http_client.OK, data=response) |
| credentials = self.make_workforce_pool_credentials( |
| workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT |
| ) |
| |
| credentials.refresh(request) |
| |
| self.assert_token_request_kwargs(request.call_args[1], headers, request_data) |
| assert credentials.valid |
| assert credentials.expiry == expected_expiry |
| assert not credentials.expired |
| assert credentials.token == response["access_token"] |
| |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) |
| def test_refresh_workforce_with_client_auth_success( |
| self, unused_utcnow, mock_auth_lib_value |
| ): |
| response = self.SUCCESS_RESPONSE.copy() |
| # Test custom expiration to confirm expiry is set correctly. |
| response["expires_in"] = 2800 |
| expected_expiry = datetime.datetime.min + datetime.timedelta( |
| seconds=response["expires_in"] |
| ) |
| headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING), |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false", |
| } |
| request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.WORKFORCE_AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE, |
| } |
| request = self.make_mock_request(status=http_client.OK, data=response) |
| # Client Auth will have higher priority over workforce_pool_user_project. |
| credentials = self.make_workforce_pool_credentials( |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT, |
| ) |
| |
| credentials.refresh(request) |
| |
| self.assert_token_request_kwargs(request.call_args[1], headers, request_data) |
| assert credentials.valid |
| assert credentials.expiry == expected_expiry |
| assert not credentials.expired |
| assert credentials.token == response["access_token"] |
| |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) |
| def test_refresh_workforce_with_client_auth_and_no_workforce_project_success( |
| self, unused_utcnow, mock_lib_version_value |
| ): |
| response = self.SUCCESS_RESPONSE.copy() |
| # Test custom expiration to confirm expiry is set correctly. |
| response["expires_in"] = 2800 |
| expected_expiry = datetime.datetime.min + datetime.timedelta( |
| seconds=response["expires_in"] |
| ) |
| headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING), |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false", |
| } |
| request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.WORKFORCE_AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE, |
| } |
| request = self.make_mock_request(status=http_client.OK, data=response) |
| # Client Auth will be sufficient for user project determination. |
| credentials = self.make_workforce_pool_credentials( |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| workforce_pool_user_project=None, |
| ) |
| |
| credentials.refresh(request) |
| |
| self.assert_token_request_kwargs(request.call_args[1], headers, request_data) |
| assert credentials.valid |
| assert credentials.expiry == expected_expiry |
| assert not credentials.expired |
| assert credentials.token == response["access_token"] |
| |
| @mock.patch( |
| "google.auth.metrics.token_request_access_token_impersonate", |
| return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| ) |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| def test_refresh_impersonation_without_client_auth_success( |
| self, mock_metrics_header_value, mock_auth_lib_value |
| ): |
| # Simulate service account access token expires in 2800 seconds. |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800) |
| ).isoformat("T") + "Z" |
| expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ") |
| # STS token exchange request/response. |
| token_response = self.SUCCESS_RESPONSE.copy() |
| token_headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false", |
| } |
| token_request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.SUBJECT_TOKEN_TYPE, |
| "scope": "https://www.googleapis.com/auth/iam", |
| } |
| # Service account impersonation request/response. |
| impersonation_response = { |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": expire_time, |
| } |
| impersonation_headers = { |
| "Content-Type": "application/json", |
| "authorization": "Bearer {}".format(token_response["access_token"]), |
| "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| } |
| impersonation_request_data = { |
| "delegates": None, |
| "scope": self.SCOPES, |
| "lifetime": "3600s", |
| } |
| # Initialize mock request to handle token exchange and service account |
| # impersonation request. |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=token_response, |
| impersonation_status=http_client.OK, |
| impersonation_data=impersonation_response, |
| ) |
| # Initialize credentials with service account impersonation. |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| scopes=self.SCOPES, |
| ) |
| |
| credentials.refresh(request) |
| |
| # Only 2 requests should be processed. |
| assert len(request.call_args_list) == 2 |
| # Verify token exchange request parameters. |
| self.assert_token_request_kwargs( |
| request.call_args_list[0][1], token_headers, token_request_data |
| ) |
| # Verify service account impersonation request parameters. |
| self.assert_impersonation_request_kwargs( |
| request.call_args_list[1][1], |
| impersonation_headers, |
| impersonation_request_data, |
| ) |
| assert credentials.valid |
| assert credentials.expiry == expected_expiry |
| assert not credentials.expired |
| assert credentials.token == impersonation_response["accessToken"] |
| |
| @mock.patch( |
| "google.auth.metrics.token_request_access_token_impersonate", |
| return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| ) |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| @mock.patch( |
| "google.auth.external_account.Credentials._mtls_required", return_value=True |
| ) |
| @mock.patch( |
| "google.auth.external_account.Credentials._get_mtls_cert_and_key_paths", |
| return_value=("path/to/cert.pem", "path/to/key.pem"), |
| ) |
| def test_refresh_impersonation_with_mtls_success( |
| self, |
| mock_get_mtls_cert_and_key_paths, |
| mock_mtls_required, |
| mock_metrics_header_value, |
| mock_auth_lib_value, |
| ): |
| # Simulate service account access token expires in 2800 seconds. |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800) |
| ).isoformat("T") + "Z" |
| expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ") |
| # STS token exchange request/response. |
| token_response = self.SUCCESS_RESPONSE.copy() |
| token_headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false", |
| } |
| token_request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.SUBJECT_TOKEN_TYPE, |
| "scope": "https://www.googleapis.com/auth/iam", |
| } |
| # Service account impersonation request/response. |
| impersonation_response = { |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": expire_time, |
| } |
| impersonation_headers = { |
| "Content-Type": "application/json", |
| "authorization": "Bearer {}".format(token_response["access_token"]), |
| "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| } |
| impersonation_request_data = { |
| "delegates": None, |
| "scope": self.SCOPES, |
| "lifetime": "3600s", |
| } |
| # Initialize mock request to handle token exchange and service account |
| # impersonation request. |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=token_response, |
| impersonation_status=http_client.OK, |
| impersonation_data=impersonation_response, |
| ) |
| # Initialize credentials with service account impersonation. |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| scopes=self.SCOPES, |
| ) |
| |
| credentials.refresh(request) |
| |
| # Only 2 requests should be processed. |
| assert len(request.call_args_list) == 2 |
| # Verify token exchange request parameters. |
| expected_cert_paths = ("path/to/cert.pem", "path/to/key.pem") |
| self.assert_token_request_kwargs( |
| request.call_args_list[0][1], |
| token_headers, |
| token_request_data, |
| expected_cert_paths, |
| ) |
| # Verify service account impersonation request parameters. |
| self.assert_impersonation_request_kwargs( |
| request.call_args_list[1][1], |
| impersonation_headers, |
| impersonation_request_data, |
| expected_cert_paths, |
| ) |
| assert credentials.valid |
| assert credentials.expiry == expected_expiry |
| assert not credentials.expired |
| assert credentials.token == impersonation_response["accessToken"] |
| |
| @mock.patch( |
| "google.auth.metrics.token_request_access_token_impersonate", |
| return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| ) |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| def test_refresh_workforce_impersonation_without_client_auth_success( |
| self, mock_metrics_header_value, mock_auth_lib_value |
| ): |
| # Simulate service account access token expires in 2800 seconds. |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800) |
| ).isoformat("T") + "Z" |
| expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ") |
| # STS token exchange request/response. |
| token_response = self.SUCCESS_RESPONSE.copy() |
| token_headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false", |
| } |
| token_request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.WORKFORCE_AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE, |
| "scope": "https://www.googleapis.com/auth/iam", |
| "options": urllib.parse.quote( |
| json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT}) |
| ), |
| } |
| # Service account impersonation request/response. |
| impersonation_response = { |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": expire_time, |
| } |
| impersonation_headers = { |
| "Content-Type": "application/json", |
| "authorization": "Bearer {}".format(token_response["access_token"]), |
| "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| } |
| impersonation_request_data = { |
| "delegates": None, |
| "scope": self.SCOPES, |
| "lifetime": "3600s", |
| } |
| # Initialize mock request to handle token exchange and service account |
| # impersonation request. |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=token_response, |
| impersonation_status=http_client.OK, |
| impersonation_data=impersonation_response, |
| ) |
| # Initialize credentials with service account impersonation. |
| credentials = self.make_workforce_pool_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| scopes=self.SCOPES, |
| workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT, |
| ) |
| |
| credentials.refresh(request) |
| |
| # Only 2 requests should be processed. |
| assert len(request.call_args_list) == 2 |
| # Verify token exchange request parameters. |
| self.assert_token_request_kwargs( |
| request.call_args_list[0][1], token_headers, token_request_data |
| ) |
| # Verify service account impersonation request parameters. |
| self.assert_impersonation_request_kwargs( |
| request.call_args_list[1][1], |
| impersonation_headers, |
| impersonation_request_data, |
| ) |
| assert credentials.valid |
| assert credentials.expiry == expected_expiry |
| assert not credentials.expired |
| assert credentials.token == impersonation_response["accessToken"] |
| |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| def test_refresh_without_client_auth_success_explicit_user_scopes_ignore_default_scopes( |
| self, mock_auth_lib_value |
| ): |
| headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false", |
| } |
| request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "scope": "scope1 scope2", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.SUBJECT_TOKEN_TYPE, |
| } |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_credentials( |
| scopes=["scope1", "scope2"], |
| # Default scopes will be ignored in favor of user scopes. |
| default_scopes=["ignored"], |
| ) |
| |
| credentials.refresh(request) |
| |
| self.assert_token_request_kwargs(request.call_args[1], headers, request_data) |
| assert credentials.valid |
| assert not credentials.expired |
| assert credentials.token == self.SUCCESS_RESPONSE["access_token"] |
| assert credentials.has_scopes(["scope1", "scope2"]) |
| assert not credentials.has_scopes(["ignored"]) |
| |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| def test_refresh_without_client_auth_success_explicit_default_scopes_only( |
| self, mock_auth_lib_value |
| ): |
| headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false", |
| } |
| request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "scope": "scope1 scope2", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.SUBJECT_TOKEN_TYPE, |
| } |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_credentials( |
| scopes=None, |
| # Default scopes will be used since user scopes are none. |
| default_scopes=["scope1", "scope2"], |
| ) |
| |
| credentials.refresh(request) |
| |
| self.assert_token_request_kwargs(request.call_args[1], headers, request_data) |
| assert credentials.valid |
| assert not credentials.expired |
| assert credentials.token == self.SUCCESS_RESPONSE["access_token"] |
| assert credentials.has_scopes(["scope1", "scope2"]) |
| |
| def test_refresh_without_client_auth_error(self): |
| request = self.make_mock_request( |
| status=http_client.BAD_REQUEST, data=self.ERROR_RESPONSE |
| ) |
| credentials = self.make_credentials() |
| |
| with pytest.raises(exceptions.OAuthError) as excinfo: |
| credentials.refresh(request) |
| |
| assert excinfo.match( |
| r"Error code invalid_request: Invalid subject token - https://tools.ietf.org/html/rfc6749" |
| ) |
| assert not credentials.expired |
| assert credentials.token is None |
| |
| def test_refresh_impersonation_without_client_auth_error(self): |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=self.SUCCESS_RESPONSE, |
| impersonation_status=http_client.BAD_REQUEST, |
| impersonation_data=self.IMPERSONATION_ERROR_RESPONSE, |
| ) |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| scopes=self.SCOPES, |
| ) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.refresh(request) |
| |
| assert excinfo.match(r"Unable to acquire impersonated credentials") |
| assert not credentials.expired |
| assert credentials.token is None |
| |
| def test_refresh_impersonation_invalid_impersonated_url_error(self): |
| credentials = self.make_credentials( |
| service_account_impersonation_url="https://iamcredentials.googleapis.com/v1/invalid", |
| scopes=self.SCOPES, |
| ) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.refresh(None) |
| |
| assert excinfo.match( |
| r"Unable to determine target principal from service account impersonation URL." |
| ) |
| assert not credentials.expired |
| assert credentials.token is None |
| |
| def test_refresh_impersonation_invalid_url_format_error(self): |
| credentials = self.make_credentials( |
| service_account_impersonation_url="https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/:generateAccessToken/invalid", |
| scopes=self.SCOPES, |
| ) |
| |
| with pytest.raises(exceptions.RefreshError) as excinfo: |
| credentials.refresh(None) |
| |
| assert excinfo.match( |
| r"Unable to determine target principal from service account impersonation URL." |
| ) |
| assert not credentials.expired |
| assert credentials.token is None |
| |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| def test_refresh_with_client_auth_success(self, mock_auth_lib_value): |
| headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING), |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false", |
| } |
| request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.SUBJECT_TOKEN_TYPE, |
| } |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_credentials( |
| client_id=CLIENT_ID, client_secret=CLIENT_SECRET |
| ) |
| |
| credentials.refresh(request) |
| |
| self.assert_token_request_kwargs(request.call_args[1], headers, request_data) |
| assert credentials.valid |
| assert not credentials.expired |
| assert credentials.token == self.SUCCESS_RESPONSE["access_token"] |
| |
| @mock.patch( |
| "google.auth.metrics.token_request_access_token_impersonate", |
| return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| ) |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| def test_refresh_impersonation_with_client_auth_success_ignore_default_scopes( |
| self, mock_metrics_header_value, mock_auth_lib_value |
| ): |
| # Simulate service account access token expires in 2800 seconds. |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800) |
| ).isoformat("T") + "Z" |
| expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ") |
| # STS token exchange request/response. |
| token_response = self.SUCCESS_RESPONSE.copy() |
| token_headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING), |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false", |
| } |
| token_request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.SUBJECT_TOKEN_TYPE, |
| "scope": "https://www.googleapis.com/auth/iam", |
| } |
| # Service account impersonation request/response. |
| impersonation_response = { |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": expire_time, |
| } |
| impersonation_headers = { |
| "Content-Type": "application/json", |
| "authorization": "Bearer {}".format(token_response["access_token"]), |
| "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| } |
| impersonation_request_data = { |
| "delegates": None, |
| "scope": self.SCOPES, |
| "lifetime": "3600s", |
| } |
| # Initialize mock request to handle token exchange and service account |
| # impersonation request. |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=token_response, |
| impersonation_status=http_client.OK, |
| impersonation_data=impersonation_response, |
| ) |
| # Initialize credentials with service account impersonation and basic auth. |
| credentials = self.make_credentials( |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| scopes=self.SCOPES, |
| # Default scopes will be ignored since user scopes are specified. |
| default_scopes=["ignored"], |
| ) |
| |
| credentials.refresh(request) |
| |
| # Only 2 requests should be processed. |
| assert len(request.call_args_list) == 2 |
| # Verify token exchange request parameters. |
| self.assert_token_request_kwargs( |
| request.call_args_list[0][1], token_headers, token_request_data |
| ) |
| # Verify service account impersonation request parameters. |
| self.assert_impersonation_request_kwargs( |
| request.call_args_list[1][1], |
| impersonation_headers, |
| impersonation_request_data, |
| ) |
| assert credentials.valid |
| assert credentials.expiry == expected_expiry |
| assert not credentials.expired |
| assert credentials.token == impersonation_response["accessToken"] |
| |
| @mock.patch( |
| "google.auth.metrics.token_request_access_token_impersonate", |
| return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| ) |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| def test_refresh_impersonation_with_client_auth_success_use_default_scopes( |
| self, mock_metrics_header_value, mock_auth_lib_value |
| ): |
| # Simulate service account access token expires in 2800 seconds. |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800) |
| ).isoformat("T") + "Z" |
| expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ") |
| # STS token exchange request/response. |
| token_response = self.SUCCESS_RESPONSE.copy() |
| token_headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "Authorization": "Basic {}".format(BASIC_AUTH_ENCODING), |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false", |
| } |
| token_request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.SUBJECT_TOKEN_TYPE, |
| "scope": "https://www.googleapis.com/auth/iam", |
| } |
| # Service account impersonation request/response. |
| impersonation_response = { |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": expire_time, |
| } |
| impersonation_headers = { |
| "Content-Type": "application/json", |
| "authorization": "Bearer {}".format(token_response["access_token"]), |
| "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| } |
| impersonation_request_data = { |
| "delegates": None, |
| "scope": self.SCOPES, |
| "lifetime": "3600s", |
| } |
| # Initialize mock request to handle token exchange and service account |
| # impersonation request. |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=token_response, |
| impersonation_status=http_client.OK, |
| impersonation_data=impersonation_response, |
| ) |
| # Initialize credentials with service account impersonation and basic auth. |
| credentials = self.make_credentials( |
| client_id=CLIENT_ID, |
| client_secret=CLIENT_SECRET, |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| scopes=None, |
| # Default scopes will be used since user specified scopes are none. |
| default_scopes=self.SCOPES, |
| ) |
| |
| credentials.refresh(request) |
| |
| # Only 2 requests should be processed. |
| assert len(request.call_args_list) == 2 |
| # Verify token exchange request parameters. |
| self.assert_token_request_kwargs( |
| request.call_args_list[0][1], token_headers, token_request_data |
| ) |
| # Verify service account impersonation request parameters. |
| self.assert_impersonation_request_kwargs( |
| request.call_args_list[1][1], |
| impersonation_headers, |
| impersonation_request_data, |
| ) |
| assert credentials.valid |
| assert credentials.expiry == expected_expiry |
| assert not credentials.expired |
| assert credentials.token == impersonation_response["accessToken"] |
| |
| def test_apply_without_quota_project_id(self): |
| headers = {} |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_credentials() |
| |
| credentials.refresh(request) |
| credentials.apply(headers) |
| |
| assert headers == { |
| "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]) |
| } |
| |
| def test_apply_workforce_without_quota_project_id(self): |
| headers = {} |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_workforce_pool_credentials( |
| workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT |
| ) |
| |
| credentials.refresh(request) |
| credentials.apply(headers) |
| |
| assert headers == { |
| "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]) |
| } |
| |
| def test_apply_impersonation_without_quota_project_id(self): |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600) |
| ).isoformat("T") + "Z" |
| # Service account impersonation response. |
| impersonation_response = { |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": expire_time, |
| } |
| # Initialize mock request to handle token exchange and service account |
| # impersonation request. |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=self.SUCCESS_RESPONSE.copy(), |
| impersonation_status=http_client.OK, |
| impersonation_data=impersonation_response, |
| ) |
| # Initialize credentials with service account impersonation. |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| scopes=self.SCOPES, |
| ) |
| headers = {} |
| |
| credentials.refresh(request) |
| credentials.apply(headers) |
| |
| assert headers == { |
| "authorization": "Bearer {}".format(impersonation_response["accessToken"]) |
| } |
| |
| def test_apply_with_quota_project_id(self): |
| headers = {"other": "header-value"} |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_credentials(quota_project_id=self.QUOTA_PROJECT_ID) |
| |
| credentials.refresh(request) |
| credentials.apply(headers) |
| |
| assert headers == { |
| "other": "header-value", |
| "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), |
| "x-goog-user-project": self.QUOTA_PROJECT_ID, |
| } |
| |
| def test_apply_impersonation_with_quota_project_id(self): |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600) |
| ).isoformat("T") + "Z" |
| # Service account impersonation response. |
| impersonation_response = { |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": expire_time, |
| } |
| # Initialize mock request to handle token exchange and service account |
| # impersonation request. |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=self.SUCCESS_RESPONSE.copy(), |
| impersonation_status=http_client.OK, |
| impersonation_data=impersonation_response, |
| ) |
| # Initialize credentials with service account impersonation. |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| scopes=self.SCOPES, |
| quota_project_id=self.QUOTA_PROJECT_ID, |
| ) |
| headers = {"other": "header-value"} |
| |
| credentials.refresh(request) |
| credentials.apply(headers) |
| |
| assert headers == { |
| "other": "header-value", |
| "authorization": "Bearer {}".format(impersonation_response["accessToken"]), |
| "x-goog-user-project": self.QUOTA_PROJECT_ID, |
| } |
| |
| def test_before_request(self): |
| headers = {"other": "header-value"} |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_credentials() |
| |
| # First call should call refresh, setting the token. |
| credentials.before_request(request, "POST", "https://example.com/api", headers) |
| |
| assert headers == { |
| "other": "header-value", |
| "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), |
| } |
| |
| # Second call shouldn't call refresh. |
| credentials.before_request(request, "POST", "https://example.com/api", headers) |
| |
| assert headers == { |
| "other": "header-value", |
| "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), |
| } |
| |
| def test_before_request_workforce(self): |
| headers = {"other": "header-value"} |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_workforce_pool_credentials( |
| workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT |
| ) |
| |
| # First call should call refresh, setting the token. |
| credentials.before_request(request, "POST", "https://example.com/api", headers) |
| |
| assert headers == { |
| "other": "header-value", |
| "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), |
| } |
| |
| # Second call shouldn't call refresh. |
| credentials.before_request(request, "POST", "https://example.com/api", headers) |
| |
| assert headers == { |
| "other": "header-value", |
| "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]), |
| } |
| |
| def test_before_request_impersonation(self): |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600) |
| ).isoformat("T") + "Z" |
| # Service account impersonation response. |
| impersonation_response = { |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": expire_time, |
| } |
| # Initialize mock request to handle token exchange and service account |
| # impersonation request. |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=self.SUCCESS_RESPONSE.copy(), |
| impersonation_status=http_client.OK, |
| impersonation_data=impersonation_response, |
| ) |
| headers = {"other": "header-value"} |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL |
| ) |
| |
| # First call should call refresh, setting the token. |
| credentials.before_request(request, "POST", "https://example.com/api", headers) |
| |
| assert headers == { |
| "other": "header-value", |
| "authorization": "Bearer {}".format(impersonation_response["accessToken"]), |
| } |
| |
| # Second call shouldn't call refresh. |
| credentials.before_request(request, "POST", "https://example.com/api", headers) |
| |
| assert headers == { |
| "other": "header-value", |
| "authorization": "Bearer {}".format(impersonation_response["accessToken"]), |
| } |
| |
| @mock.patch("google.auth._helpers.utcnow") |
| def test_before_request_expired(self, utcnow): |
| headers = {} |
| request = self.make_mock_request( |
| status=http_client.OK, data=self.SUCCESS_RESPONSE |
| ) |
| credentials = self.make_credentials() |
| credentials.token = "token" |
| utcnow.return_value = datetime.datetime.min |
| # Set the expiration to one second more than now plus the clock skew |
| # accomodation. These credentials should be valid. |
| credentials.expiry = ( |
| datetime.datetime.min |
| + _helpers.REFRESH_THRESHOLD |
| + datetime.timedelta(seconds=1) |
| ) |
| |
| assert credentials.valid |
| assert not credentials.expired |
| assert credentials.token_state == TokenState.FRESH |
| |
| credentials.before_request(request, "POST", "https://example.com/api", headers) |
| |
| # Cached token should be used. |
| assert headers == {"authorization": "Bearer token"} |
| |
| # Next call should simulate 1 second passed. |
| utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1) |
| |
| assert not credentials.valid |
| assert credentials.expired |
| assert credentials.token_state == TokenState.STALE |
| |
| credentials.before_request(request, "POST", "https://example.com/api", headers) |
| assert credentials.token_state == TokenState.FRESH |
| |
| # New token should be retrieved. |
| assert headers == { |
| "authorization": "Bearer {}".format(self.SUCCESS_RESPONSE["access_token"]) |
| } |
| |
| def test_refresh_impersonation_trust_boundary(self): |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=self.SUCCESS_RESPONSE, |
| impersonation_status=http_client.OK, |
| impersonation_data={ |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": "2025-01-01T00:00:00Z", |
| }, |
| ) |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL |
| ) |
| impersonated_creds_mock = mock.Mock() |
| impersonated_creds_mock._trust_boundary = self.VALID_TRUST_BOUNDARY |
| |
| with mock.patch( |
| "google.auth.external_account.impersonated_credentials.Credentials", |
| return_value=impersonated_creds_mock, |
| ): |
| credentials.refresh(request) |
| |
| assert credentials._trust_boundary == self.VALID_TRUST_BOUNDARY |
| |
| def test_with_trust_boundary(self): |
| credentials = self.make_credentials() |
| new_credentials = credentials.with_trust_boundary(self.VALID_TRUST_BOUNDARY) |
| assert new_credentials._trust_boundary == self.VALID_TRUST_BOUNDARY |
| |
| @mock.patch("google.auth._helpers.utcnow") |
| def test_before_request_impersonation_expired(self, utcnow): |
| headers = {} |
| expire_time = ( |
| datetime.datetime.min + datetime.timedelta(seconds=3601) |
| ).isoformat("T") + "Z" |
| # Service account impersonation response. |
| impersonation_response = { |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": expire_time, |
| } |
| # Initialize mock request to handle token exchange and service account |
| # impersonation request. |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=self.SUCCESS_RESPONSE.copy(), |
| impersonation_status=http_client.OK, |
| impersonation_data=impersonation_response, |
| ) |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL |
| ) |
| credentials.token = "token" |
| utcnow.return_value = datetime.datetime.min |
| # Set the expiration to one second more than now plus the clock skew |
| # accomodation. These credentials should be valid. |
| credentials.expiry = ( |
| datetime.datetime.min |
| + _helpers.REFRESH_THRESHOLD |
| + datetime.timedelta(seconds=1) |
| ) |
| |
| assert credentials.valid |
| assert not credentials.expired |
| assert credentials.token_state == TokenState.FRESH |
| |
| credentials.before_request(request, "POST", "https://example.com/api", headers) |
| assert credentials.token_state == TokenState.FRESH |
| |
| # Cached token should be used. |
| assert headers == {"authorization": "Bearer token"} |
| |
| # Next call should simulate 1 second passed. This will trigger the expiration |
| # threshold. |
| utcnow.return_value = datetime.datetime.min + datetime.timedelta(seconds=1) |
| |
| assert not credentials.valid |
| assert credentials.expired |
| assert credentials.token_state == TokenState.STALE |
| |
| credentials.before_request(request, "POST", "https://example.com/api", headers) |
| assert credentials.token_state == TokenState.FRESH |
| |
| credentials.before_request(request, "POST", "https://example.com/api", headers) |
| |
| # New token should be retrieved. |
| assert headers == { |
| "authorization": "Bearer {}".format(impersonation_response["accessToken"]) |
| } |
| |
| @pytest.mark.parametrize( |
| "audience", |
| [ |
| # Legacy K8s audience format. |
| "identitynamespace:1f12345:my_provider", |
| # Unrealistic audiences. |
| "//iam.googleapis.com/projects", |
| "//iam.googleapis.com/projects/", |
| "//iam.googleapis.com/project/123456", |
| "//iam.googleapis.com/projects//123456", |
| "//iam.googleapis.com/prefix_projects/123456", |
| "//iam.googleapis.com/projects_suffix/123456", |
| ], |
| ) |
| def test_project_number_indeterminable(self, audience): |
| credentials = CredentialsImpl( |
| audience=audience, |
| subject_token_type=self.SUBJECT_TOKEN_TYPE, |
| token_url=self.TOKEN_URL, |
| credential_source=self.CREDENTIAL_SOURCE, |
| ) |
| |
| assert credentials.project_number is None |
| assert credentials.get_project_id(None) is None |
| |
| def test_project_number_determinable(self): |
| credentials = CredentialsImpl( |
| audience=self.AUDIENCE, |
| subject_token_type=self.SUBJECT_TOKEN_TYPE, |
| token_url=self.TOKEN_URL, |
| credential_source=self.CREDENTIAL_SOURCE, |
| ) |
| |
| assert credentials.project_number == self.PROJECT_NUMBER |
| |
| def test_project_number_workforce(self): |
| credentials = CredentialsImpl( |
| audience=self.WORKFORCE_AUDIENCE, |
| subject_token_type=self.WORKFORCE_SUBJECT_TOKEN_TYPE, |
| token_url=self.TOKEN_URL, |
| credential_source=self.CREDENTIAL_SOURCE, |
| workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT, |
| ) |
| |
| assert credentials.project_number is None |
| |
| def test_project_id_without_scopes(self): |
| # Initialize credentials with no scopes. |
| credentials = CredentialsImpl( |
| audience=self.AUDIENCE, |
| subject_token_type=self.SUBJECT_TOKEN_TYPE, |
| token_url=self.TOKEN_URL, |
| credential_source=self.CREDENTIAL_SOURCE, |
| ) |
| |
| assert credentials.get_project_id(None) is None |
| |
| @mock.patch( |
| "google.auth.metrics.token_request_access_token_impersonate", |
| return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| ) |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| def test_get_project_id_cloud_resource_manager_success( |
| self, mock_metrics_header_value, mock_auth_lib_value |
| ): |
| # STS token exchange request/response. |
| token_response = self.SUCCESS_RESPONSE.copy() |
| token_headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/false", |
| } |
| token_request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.SUBJECT_TOKEN_TYPE, |
| "scope": "https://www.googleapis.com/auth/iam", |
| } |
| # Service account impersonation request/response. |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=3600) |
| ).isoformat("T") + "Z" |
| expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ") |
| impersonation_response = { |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": expire_time, |
| } |
| impersonation_headers = { |
| "Content-Type": "application/json", |
| "x-goog-user-project": self.QUOTA_PROJECT_ID, |
| "authorization": "Bearer {}".format(token_response["access_token"]), |
| "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| } |
| impersonation_request_data = { |
| "delegates": None, |
| "scope": self.SCOPES, |
| "lifetime": "3600s", |
| } |
| # Initialize mock request to handle token exchange, service account |
| # impersonation and cloud resource manager request. |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=self.SUCCESS_RESPONSE.copy(), |
| impersonation_status=http_client.OK, |
| impersonation_data=impersonation_response, |
| cloud_resource_manager_status=http_client.OK, |
| cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE, |
| ) |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| scopes=self.SCOPES, |
| quota_project_id=self.QUOTA_PROJECT_ID, |
| ) |
| |
| # Expected project ID from cloud resource manager response should be returned. |
| project_id = credentials.get_project_id(request) |
| |
| assert project_id == self.PROJECT_ID |
| # 3 requests should be processed. |
| assert len(request.call_args_list) == 3 |
| # Verify token exchange request parameters. |
| self.assert_token_request_kwargs( |
| request.call_args_list[0][1], token_headers, token_request_data |
| ) |
| # Verify service account impersonation request parameters. |
| self.assert_impersonation_request_kwargs( |
| request.call_args_list[1][1], |
| impersonation_headers, |
| impersonation_request_data, |
| ) |
| # In the process of getting project ID, an access token should be |
| # retrieved. |
| assert credentials.valid |
| assert credentials.expiry == expected_expiry |
| assert not credentials.expired |
| assert credentials.token == impersonation_response["accessToken"] |
| # Verify cloud resource manager request parameters. |
| self.assert_resource_manager_request_kwargs( |
| request.call_args_list[2][1], |
| self.PROJECT_NUMBER, |
| { |
| "x-goog-user-project": self.QUOTA_PROJECT_ID, |
| "authorization": "Bearer {}".format( |
| impersonation_response["accessToken"] |
| ), |
| }, |
| ) |
| |
| # Calling get_project_id again should return the cached project_id. |
| project_id = credentials.get_project_id(request) |
| |
| assert project_id == self.PROJECT_ID |
| # No additional requests. |
| assert len(request.call_args_list) == 3 |
| |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| def test_workforce_pool_get_project_id_cloud_resource_manager_success( |
| self, mock_auth_lib_value |
| ): |
| # STS token exchange request/response. |
| token_headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/false config-lifetime/false", |
| } |
| token_request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.WORKFORCE_AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.WORKFORCE_SUBJECT_TOKEN_TYPE, |
| "scope": "scope1 scope2", |
| "options": urllib.parse.quote( |
| json.dumps({"userProject": self.WORKFORCE_POOL_USER_PROJECT}) |
| ), |
| } |
| # Initialize mock request to handle token exchange and cloud resource |
| # manager request. |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=self.SUCCESS_RESPONSE.copy(), |
| cloud_resource_manager_status=http_client.OK, |
| cloud_resource_manager_data=self.CLOUD_RESOURCE_MANAGER_SUCCESS_RESPONSE, |
| ) |
| credentials = self.make_workforce_pool_credentials( |
| scopes=self.SCOPES, |
| quota_project_id=self.QUOTA_PROJECT_ID, |
| workforce_pool_user_project=self.WORKFORCE_POOL_USER_PROJECT, |
| ) |
| |
| # Expected project ID from cloud resource manager response should be returned. |
| project_id = credentials.get_project_id(request) |
| |
| assert project_id == self.PROJECT_ID |
| # 2 requests should be processed. |
| assert len(request.call_args_list) == 2 |
| # Verify token exchange request parameters. |
| self.assert_token_request_kwargs( |
| request.call_args_list[0][1], token_headers, token_request_data |
| ) |
| # In the process of getting project ID, an access token should be |
| # retrieved. |
| assert credentials.valid |
| assert not credentials.expired |
| assert credentials.token == self.SUCCESS_RESPONSE["access_token"] |
| # Verify cloud resource manager request parameters. |
| self.assert_resource_manager_request_kwargs( |
| request.call_args_list[1][1], |
| self.WORKFORCE_POOL_USER_PROJECT, |
| { |
| "x-goog-user-project": self.QUOTA_PROJECT_ID, |
| "authorization": "Bearer {}".format( |
| self.SUCCESS_RESPONSE["access_token"] |
| ), |
| }, |
| ) |
| |
| # Calling get_project_id again should return the cached project_id. |
| project_id = credentials.get_project_id(request) |
| |
| assert project_id == self.PROJECT_ID |
| # No additional requests. |
| assert len(request.call_args_list) == 2 |
| |
| @mock.patch( |
| "google.auth.metrics.token_request_access_token_impersonate", |
| return_value=IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| ) |
| @mock.patch( |
| "google.auth.metrics.python_and_auth_lib_version", |
| return_value=LANG_LIBRARY_METRICS_HEADER_VALUE, |
| ) |
| def test_refresh_impersonation_with_lifetime( |
| self, mock_metrics_header_value, mock_auth_lib_value |
| ): |
| # Simulate service account access token expires in 2800 seconds. |
| expire_time = ( |
| _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=2800) |
| ).isoformat("T") + "Z" |
| expected_expiry = datetime.datetime.strptime(expire_time, "%Y-%m-%dT%H:%M:%SZ") |
| # STS token exchange request/response. |
| token_response = self.SUCCESS_RESPONSE.copy() |
| token_headers = { |
| "Content-Type": "application/x-www-form-urlencoded", |
| "x-goog-api-client": "gl-python/3.7 auth/1.1 google-byoid-sdk sa-impersonation/true config-lifetime/true", |
| } |
| token_request_data = { |
| "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", |
| "audience": self.AUDIENCE, |
| "requested_token_type": "urn:ietf:params:oauth:token-type:access_token", |
| "subject_token": "subject_token_0", |
| "subject_token_type": self.SUBJECT_TOKEN_TYPE, |
| "scope": "https://www.googleapis.com/auth/iam", |
| } |
| # Service account impersonation request/response. |
| impersonation_response = { |
| "accessToken": "SA_ACCESS_TOKEN", |
| "expireTime": expire_time, |
| } |
| impersonation_headers = { |
| "Content-Type": "application/json", |
| "authorization": "Bearer {}".format(token_response["access_token"]), |
| "x-goog-api-client": IMPERSONATE_ACCESS_TOKEN_REQUEST_METRICS_HEADER_VALUE, |
| } |
| impersonation_request_data = { |
| "delegates": None, |
| "scope": self.SCOPES, |
| "lifetime": "2800s", |
| } |
| # Initialize mock request to handle token exchange and service account |
| # impersonation request. |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=token_response, |
| impersonation_status=http_client.OK, |
| impersonation_data=impersonation_response, |
| ) |
| # Initialize credentials with service account impersonation. |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL, |
| service_account_impersonation_options={"token_lifetime_seconds": 2800}, |
| scopes=self.SCOPES, |
| ) |
| |
| credentials.refresh(request) |
| |
| # Only 2 requests should be processed. |
| assert len(request.call_args_list) == 2 |
| # Verify token exchange request parameters. |
| self.assert_token_request_kwargs( |
| request.call_args_list[0][1], token_headers, token_request_data |
| ) |
| # Verify service account impersonation request parameters. |
| self.assert_impersonation_request_kwargs( |
| request.call_args_list[1][1], |
| impersonation_headers, |
| impersonation_request_data, |
| ) |
| assert credentials.valid |
| assert credentials.expiry == expected_expiry |
| assert not credentials.expired |
| assert credentials.token == impersonation_response["accessToken"] |
| |
| def test_get_project_id_cloud_resource_manager_error(self): |
| # Simulate resource doesn't have sufficient permissions to access |
| # cloud resource manager. |
| request = self.make_mock_request( |
| status=http_client.OK, |
| data=self.SUCCESS_RESPONSE.copy(), |
| cloud_resource_manager_status=http_client.UNAUTHORIZED, |
| ) |
| credentials = self.make_credentials(scopes=self.SCOPES) |
| |
| project_id = credentials.get_project_id(request) |
| |
| assert project_id is None |
| # Only 2 requests to STS and cloud resource manager should be sent. |
| assert len(request.call_args_list) == 2 |
| |
| def test_refresh_with_existing_impersonated_credentials(self): |
| credentials = self.make_credentials( |
| service_account_impersonation_url=self.SERVICE_ACCOUNT_IMPERSONATION_URL |
| ) |
| credentials._impersonated_credentials = mock.Mock() |
| request = self.make_mock_request() |
| |
| credentials.refresh(request) |
| |
| credentials._impersonated_credentials.refresh.assert_called_once_with(request) |
| |
| def test_get_mtls_cert_and_key_paths(self): |
| credentials = self.make_credentials() |
| with pytest.raises(NotImplementedError): |
| credentials._get_mtls_cert_and_key_paths() |
| |
| |
| def test_supplier_context(): |
| context = external_account.SupplierContext("TestTokenType", "TestAudience") |
| |
| assert context.subject_token_type == "TestTokenType" |
| assert context.audience == "TestAudience" |