fix: ADC with impersonated workforce pools (#877)
While service account impersonation is uncommonly used with workforce
pool external credentials, there is a bug where the following commands
raise exceptions when impersonated workforce pools are used:
- `google.auth.default()`
- `google.auth.load_credentials_from_file()`
The issue is due to `google.auth.aws.Credentials` not supporting the
`workforce_pool_user_project` argument in the constructor, unlike
`google.auth.identity_pool.Credentials`.
This was indirectly passed here:
https://github.com/googleapis/google-auth-library-python/blob/a37ff00d7afd6c7aac2d0fab29e05708bbc068be/google/auth/external_account.py#L395
Causing a TypeError to be raised (we only catch ValueError).
Updated the credential determination logic to explicitly check the
subject token type. This is a more reliable indicator instead of a
try/catch.
Increased unit test coverage in tests/test__default.py to cover these
credentials.
diff --git a/tests/test__default.py b/tests/test__default.py
index c70ceaa..1ce03cf 100644
--- a/tests/test__default.py
+++ b/tests/test__default.py
@@ -55,6 +55,10 @@
SUBJECT_TOKEN_TEXT_FILE = os.path.join(DATA_DIR, "external_subject_token.txt")
TOKEN_URL = "https://sts.googleapis.com/v1/token"
AUDIENCE = "//iam.googleapis.com/projects/123456/locations/global/workloadIdentityPools/POOL_ID/providers/PROVIDER_ID"
+WORKFORCE_AUDIENCE = (
+ "//iam.googleapis.com/locations/global/workforcePools/POOL_ID/providers/PROVIDER_ID"
+)
+WORKFORCE_POOL_USER_PROJECT = "WORKFORCE_POOL_USER_PROJECT_NUMBER"
REGION_URL = "http://169.254.169.254/latest/meta-data/placement/availability-zone"
SECURITY_CREDS_URL = "http://169.254.169.254/latest/meta-data/iam/security-credentials"
CRED_VERIFICATION_URL = (
@@ -79,6 +83,49 @@
"regional_cred_verification_url": CRED_VERIFICATION_URL,
},
}
+SERVICE_ACCOUNT_EMAIL = "[email protected]"
+SERVICE_ACCOUNT_IMPERSONATION_URL = (
+ "https://us-east1-iamcredentials.googleapis.com/v1/projects/-"
+ + "/serviceAccounts/{}:generateAccessToken".format(SERVICE_ACCOUNT_EMAIL)
+)
+IMPERSONATED_IDENTITY_POOL_DATA = {
+ "type": "external_account",
+ "audience": AUDIENCE,
+ "subject_token_type": "urn:ietf:params:oauth:token-type:jwt",
+ "token_url": TOKEN_URL,
+ "credential_source": {"file": SUBJECT_TOKEN_TEXT_FILE},
+ "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
+}
+IMPERSONATED_AWS_DATA = {
+ "type": "external_account",
+ "audience": AUDIENCE,
+ "subject_token_type": "urn:ietf:params:aws:token-type:aws4_request",
+ "token_url": TOKEN_URL,
+ "credential_source": {
+ "environment_id": "aws1",
+ "region_url": REGION_URL,
+ "url": SECURITY_CREDS_URL,
+ "regional_cred_verification_url": CRED_VERIFICATION_URL,
+ },
+ "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
+}
+IDENTITY_POOL_WORKFORCE_DATA = {
+ "type": "external_account",
+ "audience": WORKFORCE_AUDIENCE,
+ "subject_token_type": "urn:ietf:params:oauth:token-type:id_token",
+ "token_url": TOKEN_URL,
+ "credential_source": {"file": SUBJECT_TOKEN_TEXT_FILE},
+ "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
+}
+IMPERSONATED_IDENTITY_POOL_WORKFORCE_DATA = {
+ "type": "external_account",
+ "audience": WORKFORCE_AUDIENCE,
+ "subject_token_type": "urn:ietf:params:oauth:token-type:id_token",
+ "token_url": TOKEN_URL,
+ "credential_source": {"file": SUBJECT_TOKEN_TEXT_FILE},
+ "service_account_impersonation_url": SERVICE_ACCOUNT_IMPERSONATION_URL,
+ "workforce_pool_user_project": WORKFORCE_POOL_USER_PROJECT,
+}
MOCK_CREDENTIALS = mock.Mock(spec=credentials.CredentialsWithQuotaProject)
MOCK_CREDENTIALS.with_quota_project.return_value = MOCK_CREDENTIALS
@@ -257,6 +304,68 @@
@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_load_credentials_from_file_external_account_identity_pool_impersonated(
+ get_project_id, tmpdir
+):
+ config_file = tmpdir.join("config.json")
+ config_file.write(json.dumps(IMPERSONATED_IDENTITY_POOL_DATA))
+ credentials, project_id = _default.load_credentials_from_file(str(config_file))
+
+ assert isinstance(credentials, identity_pool.Credentials)
+ assert not credentials.is_user
+ assert not credentials.is_workforce_pool
+ # Since no scopes are specified, the project ID cannot be determined.
+ assert project_id is None
+ assert get_project_id.called
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_load_credentials_from_file_external_account_aws_impersonated(
+ get_project_id, tmpdir
+):
+ config_file = tmpdir.join("config.json")
+ config_file.write(json.dumps(IMPERSONATED_AWS_DATA))
+ credentials, project_id = _default.load_credentials_from_file(str(config_file))
+
+ assert isinstance(credentials, aws.Credentials)
+ assert not credentials.is_user
+ assert not credentials.is_workforce_pool
+ # Since no scopes are specified, the project ID cannot be determined.
+ assert project_id is None
+ assert get_project_id.called
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_load_credentials_from_file_external_account_workforce(get_project_id, tmpdir):
+ config_file = tmpdir.join("config.json")
+ config_file.write(json.dumps(IDENTITY_POOL_WORKFORCE_DATA))
+ credentials, project_id = _default.load_credentials_from_file(str(config_file))
+
+ assert isinstance(credentials, identity_pool.Credentials)
+ assert credentials.is_user
+ assert credentials.is_workforce_pool
+ # Since no scopes are specified, the project ID cannot be determined.
+ assert project_id is None
+ assert get_project_id.called
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_load_credentials_from_file_external_account_workforce_impersonated(
+ get_project_id, tmpdir
+):
+ config_file = tmpdir.join("config.json")
+ config_file.write(json.dumps(IMPERSONATED_IDENTITY_POOL_WORKFORCE_DATA))
+ credentials, project_id = _default.load_credentials_from_file(str(config_file))
+
+ assert isinstance(credentials, identity_pool.Credentials)
+ assert not credentials.is_user
+ assert credentials.is_workforce_pool
+ # Since no scopes are specified, the project ID cannot be determined.
+ assert project_id is None
+ assert get_project_id.called
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
def test_load_credentials_from_file_external_account_with_user_and_default_scopes(
get_project_id, tmpdir
):
@@ -718,7 +827,9 @@
@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
-def test_default_environ_external_credentials(get_project_id, monkeypatch, tmpdir):
+def test_default_environ_external_credentials_identity_pool(
+ get_project_id, monkeypatch, tmpdir
+):
config_file = tmpdir.join("config.json")
config_file.write(json.dumps(IDENTITY_POOL_DATA))
monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
@@ -726,11 +837,89 @@
credentials, project_id = _default.default()
assert isinstance(credentials, identity_pool.Credentials)
+ assert not credentials.is_user
+ assert not credentials.is_workforce_pool
# Without scopes, project ID cannot be determined.
assert project_id is None
@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_default_environ_external_credentials_identity_pool_impersonated(
+ get_project_id, monkeypatch, tmpdir
+):
+ config_file = tmpdir.join("config.json")
+ config_file.write(json.dumps(IMPERSONATED_IDENTITY_POOL_DATA))
+ monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
+
+ credentials, project_id = _default.default(
+ scopes=["https://www.google.com/calendar/feeds"]
+ )
+
+ assert isinstance(credentials, identity_pool.Credentials)
+ assert not credentials.is_user
+ assert not credentials.is_workforce_pool
+ assert project_id is mock.sentinel.project_id
+ assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_default_environ_external_credentials_aws_impersonated(
+ get_project_id, monkeypatch, tmpdir
+):
+ config_file = tmpdir.join("config.json")
+ config_file.write(json.dumps(IMPERSONATED_AWS_DATA))
+ monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
+
+ credentials, project_id = _default.default(
+ scopes=["https://www.google.com/calendar/feeds"]
+ )
+
+ assert isinstance(credentials, aws.Credentials)
+ assert not credentials.is_user
+ assert not credentials.is_workforce_pool
+ assert project_id is mock.sentinel.project_id
+ assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_default_environ_external_credentials_workforce(
+ get_project_id, monkeypatch, tmpdir
+):
+ config_file = tmpdir.join("config.json")
+ config_file.write(json.dumps(IDENTITY_POOL_WORKFORCE_DATA))
+ monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
+
+ credentials, project_id = _default.default(
+ scopes=["https://www.google.com/calendar/feeds"]
+ )
+
+ assert isinstance(credentials, identity_pool.Credentials)
+ assert credentials.is_user
+ assert credentials.is_workforce_pool
+ assert project_id is mock.sentinel.project_id
+ assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
+def test_default_environ_external_credentials_workforce_impersonated(
+ get_project_id, monkeypatch, tmpdir
+):
+ config_file = tmpdir.join("config.json")
+ config_file.write(json.dumps(IMPERSONATED_IDENTITY_POOL_WORKFORCE_DATA))
+ monkeypatch.setenv(environment_vars.CREDENTIALS, str(config_file))
+
+ credentials, project_id = _default.default(
+ scopes=["https://www.google.com/calendar/feeds"]
+ )
+
+ assert isinstance(credentials, identity_pool.Credentials)
+ assert not credentials.is_user
+ assert credentials.is_workforce_pool
+ assert project_id is mock.sentinel.project_id
+ assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
+
+
+@EXTERNAL_ACCOUNT_GET_PROJECT_ID_PATCH
def test_default_environ_external_credentials_with_user_and_default_scopes_and_quota_project_id(
get_project_id, monkeypatch, tmpdir
):