| # Copyright 2020 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """Identity Pool Credentials. |
| |
| This module provides credentials to access Google Cloud resources from on-prem |
| or non-Google Cloud platforms which support external credentials (e.g. OIDC ID |
| tokens) retrieved from local file locations or local servers. This includes |
| Microsoft Azure and OIDC identity providers (e.g. K8s workloads registered with |
| Hub with Hub workload identity enabled). |
| |
| These credentials are recommended over the use of service account credentials |
| in on-prem/non-Google Cloud platforms as they do not involve the management of |
| long-live service account private keys. |
| |
| Identity Pool Credentials are initialized using external_account |
| arguments which are typically loaded from an external credentials file or |
| an external credentials URL. |
| |
| This module also provides a definition for an abstract subject token supplier. |
| This supplier can be implemented to return a valid OIDC or SAML2.0 subject token |
| and used to create Identity Pool credentials. The credentials will then call the |
| supplier instead of using pre-defined methods such as reading a local file or |
| calling a URL. |
| """ |
| |
| try: |
| from collections.abc import Mapping |
| # Python 2.7 compatibility |
| except ImportError: # pragma: NO COVER |
| from collections import Mapping # type: ignore |
| import abc |
| import base64 |
| import json |
| import os |
| from typing import NamedTuple |
| |
| from google.auth import _helpers |
| from google.auth import exceptions |
| from google.auth import external_account |
| from google.auth.transport import _mtls_helper |
| |
| |
| class SubjectTokenSupplier(metaclass=abc.ABCMeta): |
| """Base class for subject token suppliers. This can be implemented with custom logic to retrieve |
| a subject token to exchange for a Google Cloud access token when using Workload or |
| Workforce Identity Federation. The identity pool credential does not cache the subject token, |
| so caching logic should be added in the implementation. |
| """ |
| |
| @abc.abstractmethod |
| def get_subject_token(self, context, request): |
| """Returns the requested subject token. The subject token must be valid. |
| |
| .. warning: This is not cached by the calling Google credential, so caching logic should be implemented in the supplier. |
| |
| Args: |
| context (google.auth.externalaccount.SupplierContext): The context object |
| containing information about the requested audience and subject token type. |
| request (google.auth.transport.Request): The object used to make |
| HTTP requests. |
| |
| Raises: |
| google.auth.exceptions.RefreshError: If an error is encountered during |
| subject token retrieval logic. |
| |
| Returns: |
| str: The requested subject token string. |
| """ |
| raise NotImplementedError("") |
| |
| |
| class _TokenContent(NamedTuple): |
| """Models the token content response from file and url internal suppliers. |
| Attributes: |
| content (str): The string content of the file or URL response. |
| location (str): The location the content was retrieved from. This will either be a file location or a URL. |
| """ |
| |
| content: str |
| location: str |
| |
| |
| class _FileSupplier(SubjectTokenSupplier): |
| """Internal implementation of subject token supplier which supports reading a subject token from a file.""" |
| |
| def __init__(self, path, format_type, subject_token_field_name): |
| self._path = path |
| self._format_type = format_type |
| self._subject_token_field_name = subject_token_field_name |
| |
| @_helpers.copy_docstring(SubjectTokenSupplier) |
| def get_subject_token(self, context, request): |
| if not os.path.exists(self._path): |
| raise exceptions.RefreshError("File '{}' was not found.".format(self._path)) |
| |
| with open(self._path, "r", encoding="utf-8") as file_obj: |
| token_content = _TokenContent(file_obj.read(), self._path) |
| |
| return _parse_token_data( |
| token_content, self._format_type, self._subject_token_field_name |
| ) |
| |
| |
| class _UrlSupplier(SubjectTokenSupplier): |
| """Internal implementation of subject token supplier which supports retrieving a subject token by calling a URL endpoint.""" |
| |
| def __init__(self, url, format_type, subject_token_field_name, headers): |
| self._url = url |
| self._format_type = format_type |
| self._subject_token_field_name = subject_token_field_name |
| self._headers = headers |
| |
| @_helpers.copy_docstring(SubjectTokenSupplier) |
| def get_subject_token(self, context, request): |
| response = request(url=self._url, method="GET", headers=self._headers) |
| |
| # support both string and bytes type response.data |
| response_body = ( |
| response.data.decode("utf-8") |
| if hasattr(response.data, "decode") |
| else response.data |
| ) |
| |
| if response.status != 200: |
| raise exceptions.RefreshError( |
| "Unable to retrieve Identity Pool subject token", response_body |
| ) |
| token_content = _TokenContent(response_body, self._url) |
| return _parse_token_data( |
| token_content, self._format_type, self._subject_token_field_name |
| ) |
| |
| |
| class _X509Supplier(SubjectTokenSupplier): |
| """Internal supplier for X509 workload credentials. This class is used internally and always returns an empty string as the subject token.""" |
| |
| def __init__(self, trust_chain_path, leaf_cert_callback): |
| self._trust_chain_path = trust_chain_path |
| self._leaf_cert_callback = leaf_cert_callback |
| |
| @_helpers.copy_docstring(SubjectTokenSupplier) |
| def get_subject_token(self, context, request): |
| # Import OpennSSL inline because it is an extra import only required by customers |
| # using mTLS. |
| from OpenSSL import crypto |
| |
| leaf_cert = crypto.load_certificate( |
| crypto.FILETYPE_PEM, self._leaf_cert_callback() |
| ) |
| trust_chain = self._read_trust_chain() |
| cert_chain = [] |
| |
| cert_chain.append(_X509Supplier._encode_cert(leaf_cert)) |
| |
| if trust_chain is None or len(trust_chain) == 0: |
| return json.dumps(cert_chain) |
| |
| # Append the first cert if it is not the leaf cert. |
| first_cert = _X509Supplier._encode_cert(trust_chain[0]) |
| if first_cert != cert_chain[0]: |
| cert_chain.append(first_cert) |
| |
| for i in range(1, len(trust_chain)): |
| encoded = _X509Supplier._encode_cert(trust_chain[i]) |
| # Check if the current cert is the leaf cert and raise an exception if it is. |
| if encoded == cert_chain[0]: |
| raise exceptions.RefreshError( |
| "The leaf certificate must be at the top of the trust chain file" |
| ) |
| else: |
| cert_chain.append(encoded) |
| return json.dumps(cert_chain) |
| |
| def _read_trust_chain(self): |
| # Import OpennSSL inline because it is an extra import only required by customers |
| # using mTLS. |
| from OpenSSL import crypto |
| |
| certificate_trust_chain = [] |
| # If no trust chain path was provided, return an empty list. |
| if self._trust_chain_path is None or self._trust_chain_path == "": |
| return certificate_trust_chain |
| try: |
| # Open the trust chain file. |
| with open(self._trust_chain_path, "rb") as f: |
| trust_chain_data = f.read() |
| # Split PEM data into individual certificates. |
| cert_blocks = trust_chain_data.split(b"-----BEGIN CERTIFICATE-----") |
| for cert_block in cert_blocks: |
| # Skip empty blocks. |
| if cert_block.strip(): |
| cert_data = b"-----BEGIN CERTIFICATE-----" + cert_block |
| try: |
| # Load each certificate and add it to the trust chain. |
| cert = crypto.load_certificate( |
| crypto.FILETYPE_PEM, cert_data |
| ) |
| certificate_trust_chain.append(cert) |
| except Exception as e: |
| raise exceptions.RefreshError( |
| "Error loading PEM certificates from the trust chain file '{}'".format( |
| self._trust_chain_path |
| ) |
| ) from e |
| return certificate_trust_chain |
| except FileNotFoundError: |
| raise exceptions.RefreshError( |
| "Trust chain file '{}' was not found.".format(self._trust_chain_path) |
| ) |
| |
| def _encode_cert(cert): |
| # Import OpennSSL inline because it is an extra import only required by customers |
| # using mTLS. |
| from OpenSSL import crypto |
| |
| return base64.b64encode( |
| crypto.dump_certificate(crypto.FILETYPE_ASN1, cert) |
| ).decode("utf-8") |
| |
| |
| def _parse_token_data(token_content, format_type="text", subject_token_field_name=None): |
| if format_type == "text": |
| token = token_content.content |
| else: |
| try: |
| # Parse file content as JSON. |
| response_data = json.loads(token_content.content) |
| # Get the subject_token. |
| token = response_data[subject_token_field_name] |
| except (KeyError, ValueError): |
| raise exceptions.RefreshError( |
| "Unable to parse subject_token from JSON file '{}' using key '{}'".format( |
| token_content.location, subject_token_field_name |
| ) |
| ) |
| if not token: |
| raise exceptions.RefreshError( |
| "Missing subject_token in the credential_source file" |
| ) |
| return token |
| |
| |
| class Credentials(external_account.Credentials): |
| """External account credentials sourced from files and URLs. |
| |
| **IMPORTANT**: |
| This class does not validate the credential configuration. A security |
| risk occurs when a credential configuration configured with malicious urls |
| is used. |
| When the credential configuration is accepted from an |
| untrusted source, you should validate it before using. |
| Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. |
| """ |
| |
| def __init__( |
| self, |
| audience, |
| subject_token_type, |
| token_url=external_account._DEFAULT_TOKEN_URL, |
| credential_source=None, |
| subject_token_supplier=None, |
| *args, |
| **kwargs |
| ): |
| """Instantiates an external account credentials object from a file/URL. |
| |
| Args: |
| audience (str): The STS audience field. |
| subject_token_type (str): The subject token type based on the Oauth2.0 token exchange spec. |
| Expected values include:: |
| |
| “urn:ietf:params:oauth:token-type:jwt” |
| “urn:ietf:params:oauth:token-type:id-token” |
| “urn:ietf:params:oauth:token-type:saml2” |
| |
| token_url (Optional [str]): The STS endpoint URL. If not provided, will default to "https://sts.googleapis.com/v1/token". |
| credential_source (Optional [Mapping]): The credential source dictionary used to |
| provide instructions on how to retrieve external credential to be |
| exchanged for Google access tokens. Either a credential source or |
| a subject token supplier must be provided. |
| |
| Example credential_source for url-sourced credential:: |
| |
| { |
| "url": "http://www.example.com", |
| "format": { |
| "type": "json", |
| "subject_token_field_name": "access_token", |
| }, |
| "headers": {"foo": "bar"}, |
| } |
| |
| Example credential_source for file-sourced credential:: |
| |
| { |
| "file": "/path/to/token/file.txt" |
| } |
| subject_token_supplier (Optional [SubjectTokenSupplier]): Optional subject token supplier. |
| This will be called to supply a valid subject token which will then |
| be exchanged for Google access tokens. Either a subject token supplier |
| or a credential source must be provided. |
| args (List): Optional positional arguments passed into the underlying :meth:`~external_account.Credentials.__init__` method. |
| kwargs (Mapping): Optional keyword arguments passed into the underlying :meth:`~external_account.Credentials.__init__` method. |
| |
| Raises: |
| google.auth.exceptions.RefreshError: If an error is encountered during |
| access token retrieval logic. |
| ValueError: For invalid parameters. |
| |
| .. note:: Typically one of the helper constructors |
| :meth:`from_file` or |
| :meth:`from_info` are used instead of calling the constructor directly. |
| """ |
| |
| super(Credentials, self).__init__( |
| audience=audience, |
| subject_token_type=subject_token_type, |
| token_url=token_url, |
| credential_source=credential_source, |
| *args, |
| **kwargs |
| ) |
| if credential_source is None and subject_token_supplier is None: |
| raise exceptions.InvalidValue( |
| "A valid credential source or a subject token supplier must be provided." |
| ) |
| if credential_source is not None and subject_token_supplier is not None: |
| raise exceptions.InvalidValue( |
| "Identity pool credential cannot have both a credential source and a subject token supplier." |
| ) |
| |
| if subject_token_supplier is not None: |
| self._subject_token_supplier = subject_token_supplier |
| self._credential_source_file = None |
| self._credential_source_url = None |
| self._credential_source_certificate = None |
| else: |
| if not isinstance(credential_source, Mapping): |
| self._credential_source_executable = None |
| raise exceptions.MalformedError( |
| "Invalid credential_source. The credential_source is not a dict." |
| ) |
| self._credential_source_file = credential_source.get("file") |
| self._credential_source_url = credential_source.get("url") |
| self._credential_source_certificate = credential_source.get("certificate") |
| |
| # environment_id is only supported in AWS or dedicated future external |
| # account credentials. |
| if "environment_id" in credential_source: |
| raise exceptions.MalformedError( |
| "Invalid Identity Pool credential_source field 'environment_id'" |
| ) |
| |
| # check that only one of file, url, or certificate are provided. |
| self._validate_single_source() |
| |
| if self._credential_source_certificate: |
| self._validate_certificate_config() |
| else: |
| self._validate_file_or_url_config(credential_source) |
| |
| if self._credential_source_file: |
| self._subject_token_supplier = _FileSupplier( |
| self._credential_source_file, |
| self._credential_source_format_type, |
| self._credential_source_field_name, |
| ) |
| elif self._credential_source_url: |
| self._subject_token_supplier = _UrlSupplier( |
| self._credential_source_url, |
| self._credential_source_format_type, |
| self._credential_source_field_name, |
| self._credential_source_headers, |
| ) |
| else: # self._credential_source_certificate |
| self._subject_token_supplier = _X509Supplier( |
| self._trust_chain_path, self._get_cert_bytes |
| ) |
| |
| @_helpers.copy_docstring(external_account.Credentials) |
| def retrieve_subject_token(self, request): |
| return self._subject_token_supplier.get_subject_token( |
| self._supplier_context, request |
| ) |
| |
| def _get_mtls_cert_and_key_paths(self): |
| if self._credential_source_certificate is None: |
| raise exceptions.RefreshError( |
| 'The credential is not configured to use mtls requests. The credential should include a "certificate" section in the credential source.' |
| ) |
| else: |
| return _mtls_helper._get_workload_cert_and_key_paths( |
| self._certificate_config_location |
| ) |
| |
| def _get_cert_bytes(self): |
| cert_path, _ = self._get_mtls_cert_and_key_paths() |
| return _mtls_helper._read_cert_file(cert_path) |
| |
| def _mtls_required(self): |
| return self._credential_source_certificate is not None |
| |
| def _create_default_metrics_options(self): |
| metrics_options = super(Credentials, self)._create_default_metrics_options() |
| # Check that credential source is a dict before checking for credential type. This check needs to be done |
| # here because the external_account credential constructor needs to pass the metrics options to the |
| # impersonated credential object before the identity_pool credentials are validated. |
| if isinstance(self._credential_source, Mapping): |
| if self._credential_source.get("file"): |
| metrics_options["source"] = "file" |
| elif self._credential_source.get("url"): |
| metrics_options["source"] = "url" |
| else: |
| metrics_options["source"] = "x509" |
| else: |
| metrics_options["source"] = "programmatic" |
| return metrics_options |
| |
| def _has_custom_supplier(self): |
| return self._credential_source is None |
| |
| def _constructor_args(self): |
| args = super(Credentials, self)._constructor_args() |
| # If a custom supplier was used, append it to the args dict. |
| if self._has_custom_supplier(): |
| args.update({"subject_token_supplier": self._subject_token_supplier}) |
| return args |
| |
| def _validate_certificate_config(self): |
| self._certificate_config_location = self._credential_source_certificate.get( |
| "certificate_config_location" |
| ) |
| use_default = self._credential_source_certificate.get( |
| "use_default_certificate_config" |
| ) |
| self._trust_chain_path = self._credential_source_certificate.get( |
| "trust_chain_path" |
| ) |
| if self._certificate_config_location and use_default: |
| raise exceptions.MalformedError( |
| "Invalid certificate configuration, certificate_config_location cannot be specified when use_default_certificate_config = true." |
| ) |
| if not self._certificate_config_location and not use_default: |
| raise exceptions.MalformedError( |
| "Invalid certificate configuration, use_default_certificate_config should be true if no certificate_config_location is provided." |
| ) |
| |
| def _validate_file_or_url_config(self, credential_source): |
| self._credential_source_headers = credential_source.get("headers") |
| credential_source_format = credential_source.get("format", {}) |
| # Get credential_source format type. When not provided, this |
| # defaults to text. |
| self._credential_source_format_type = ( |
| credential_source_format.get("type") or "text" |
| ) |
| if self._credential_source_format_type not in ["text", "json"]: |
| raise exceptions.MalformedError( |
| "Invalid credential_source format '{}'".format( |
| self._credential_source_format_type |
| ) |
| ) |
| # For JSON types, get the required subject_token field name. |
| if self._credential_source_format_type == "json": |
| self._credential_source_field_name = credential_source_format.get( |
| "subject_token_field_name" |
| ) |
| if self._credential_source_field_name is None: |
| raise exceptions.MalformedError( |
| "Missing subject_token_field_name for JSON credential_source format" |
| ) |
| else: |
| self._credential_source_field_name = None |
| |
| def _validate_single_source(self): |
| credential_sources = [ |
| self._credential_source_file, |
| self._credential_source_url, |
| self._credential_source_certificate, |
| ] |
| valid_credential_sources = list( |
| filter(lambda source: source is not None, credential_sources) |
| ) |
| |
| if len(valid_credential_sources) > 1: |
| raise exceptions.MalformedError( |
| "Ambiguous credential_source. 'file', 'url', and 'certificate' are mutually exclusive.." |
| ) |
| if len(valid_credential_sources) != 1: |
| raise exceptions.MalformedError( |
| "Missing credential_source. A 'file', 'url', or 'certificate' must be provided." |
| ) |
| |
| @classmethod |
| def from_info(cls, info, **kwargs): |
| """Creates an Identity Pool Credentials instance from parsed external account info. |
| |
| **IMPORTANT**: |
| This method does not validate the credential configuration. A security |
| risk occurs when a credential configuration configured with malicious urls |
| is used. |
| When the credential configuration is accepted from an |
| untrusted source, you should validate it before using with this method. |
| Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. |
| |
| Args: |
| info (Mapping[str, str]): The Identity Pool external account info in Google |
| format. |
| kwargs: Additional arguments to pass to the constructor. |
| |
| Returns: |
| google.auth.identity_pool.Credentials: The constructed |
| credentials. |
| |
| Raises: |
| ValueError: For invalid parameters. |
| """ |
| subject_token_supplier = info.get("subject_token_supplier") |
| kwargs.update({"subject_token_supplier": subject_token_supplier}) |
| return super(Credentials, cls).from_info(info, **kwargs) |
| |
| @classmethod |
| def from_file(cls, filename, **kwargs): |
| """Creates an IdentityPool Credentials instance from an external account json file. |
| |
| **IMPORTANT**: |
| This method does not validate the credential configuration. A security |
| risk occurs when a credential configuration configured with malicious urls |
| is used. |
| When the credential configuration is accepted from an |
| untrusted source, you should validate it before using with this method. |
| Refer https://cloud.google.com/docs/authentication/external/externally-sourced-credentials for more details. |
| |
| Args: |
| filename (str): The path to the IdentityPool external account json file. |
| kwargs: Additional arguments to pass to the constructor. |
| |
| Returns: |
| google.auth.identity_pool.Credentials: The constructed |
| credentials. |
| """ |
| return super(Credentials, cls).from_file(filename, **kwargs) |
| |
| def refresh(self, request): |
| """Refreshes the access token. |
| |
| Args: |
| request (google.auth.transport.Request): The object used to make |
| HTTP requests. |
| """ |
| from google.auth import _agent_identity_utils |
| |
| cert_fingerprint = None |
| # Check if the credential is X.509 based. |
| if self._credential_source_certificate is not None: |
| cert_bytes = self._get_cert_bytes() |
| cert = _agent_identity_utils.parse_certificate(cert_bytes) |
| if _agent_identity_utils.should_request_bound_token(cert): |
| cert_fingerprint = ( |
| _agent_identity_utils.calculate_certificate_fingerprint(cert) |
| ) |
| |
| self._perform_refresh_token(request, cert_fingerprint=cert_fingerprint) |
| self._handle_trust_boundary(request) |