feat: add quota project to base credentials class (#546)

diff --git a/.gitignore b/.gitignore
index 8274565..f01e60e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -13,6 +13,8 @@
 .tox/
 .cache/
 .pytest_cache/
+cert_path
+key_path
 
 # Django test database
 db.sqlite3
diff --git a/google/auth/_default.py b/google/auth/_default.py
index d3bbbda..f3e498b 100644
--- a/google/auth/_default.py
+++ b/google/auth/_default.py
@@ -69,7 +69,7 @@
         warnings.warn(_CLOUD_SDK_CREDENTIALS_WARNING)
 
 
-def load_credentials_from_file(filename, scopes=None):
+def load_credentials_from_file(filename, scopes=None, quota_project_id=None):
     """Loads Google credentials from a file.
 
     The credentials file must be a service account key or stored authorized
@@ -79,7 +79,9 @@
         filename (str): The full path to the credentials file.
         scopes (Optional[Sequence[str]]): The list of scopes for the credentials. If
             specified, the credentials will automatically be scoped if
-            necessary.
+            necessary
+        quota_project_id (Optional[str]):  The project ID used for
+                quota and billing.
 
     Returns:
         Tuple[google.auth.credentials.Credentials, Optional[str]]: Loaded
@@ -114,7 +116,7 @@
         try:
             credentials = credentials.Credentials.from_authorized_user_info(
                 info, scopes=scopes
-            )
+            ).with_quota_project(quota_project_id)
         except ValueError as caught_exc:
             msg = "Failed to load authorized user credentials from {}".format(filename)
             new_exc = exceptions.DefaultCredentialsError(msg, caught_exc)
@@ -129,7 +131,7 @@
         try:
             credentials = service_account.Credentials.from_service_account_info(
                 info, scopes=scopes
-            )
+            ).with_quota_project(quota_project_id)
         except ValueError as caught_exc:
             msg = "Failed to load service account credentials from {}".format(filename)
             new_exc = exceptions.DefaultCredentialsError(msg, caught_exc)
@@ -226,7 +228,7 @@
         return None, None
 
 
-def default(scopes=None, request=None):
+def default(scopes=None, request=None, quota_project_id=None):
     """Gets the default credentials for the current environment.
 
     `Application Default Credentials`_ provides an easy way to obtain
@@ -286,7 +288,8 @@
             HTTP requests. This is used to detect whether the application
             is running on Compute Engine. If not specified, then it will
             use the standard library http client to make requests.
-
+        quota_project_id (Optional[str]):  The project ID used for
+            quota and billing.
     Returns:
         Tuple[~google.auth.credentials.Credentials, Optional[str]]:
             the current environment's credentials and project ID. Project ID
@@ -314,7 +317,9 @@
     for checker in checkers:
         credentials, project_id = checker()
         if credentials is not None:
-            credentials = with_scopes_if_required(credentials, scopes)
+            credentials = with_scopes_if_required(
+                credentials, scopes
+            ).with_quota_project(quota_project_id)
             effective_project_id = explicit_project_id or project_id
             if not effective_project_id:
                 _LOGGER.warning(
diff --git a/google/auth/app_engine.py b/google/auth/app_engine.py
index ab69951..fae00d0 100644
--- a/google/auth/app_engine.py
+++ b/google/auth/app_engine.py
@@ -84,7 +84,7 @@
     tokens.
     """
 
-    def __init__(self, scopes=None, service_account_id=None):
+    def __init__(self, scopes=None, service_account_id=None, quota_project_id=None):
         """
         Args:
             scopes (Sequence[str]): Scopes to request from the App Identity
@@ -93,6 +93,8 @@
                 :func:`google.appengine.api.app_identity.get_access_token`.
                 If not specified, the default application service account
                 ID will be used.
+            quota_project_id (Optional[str]): The project ID used for quota
+                and billing.
 
         Raises:
             EnvironmentError: If the App Engine APIs are unavailable.
@@ -107,6 +109,7 @@
         self._scopes = scopes
         self._service_account_id = service_account_id
         self._signer = Signer()
+        self._quota_project_id = quota_project_id
 
     @_helpers.copy_docstring(credentials.Credentials)
     def refresh(self, request):
@@ -137,7 +140,17 @@
     @_helpers.copy_docstring(credentials.Scoped)
     def with_scopes(self, scopes):
         return self.__class__(
-            scopes=scopes, service_account_id=self._service_account_id
+            scopes=scopes,
+            service_account_id=self._service_account_id,
+            quota_project_id=self.quota_project_id,
+        )
+
+    @_helpers.copy_docstring(credentials.Credentials)
+    def with_quota_project(self, quota_project_id):
+        return self.__class__(
+            scopes=self._scopes,
+            service_account_id=self._service_account_id,
+            quota_project_id=quota_project_id,
         )
 
     @_helpers.copy_docstring(credentials.Signing)
diff --git a/google/auth/compute_engine/credentials.py b/google/auth/compute_engine/credentials.py
index 1550465..e6da238 100644
--- a/google/auth/compute_engine/credentials.py
+++ b/google/auth/compute_engine/credentials.py
@@ -54,15 +54,18 @@
         https://cloud.google.com/compute/docs/authentication#using
     """
 
-    def __init__(self, service_account_email="default"):
+    def __init__(self, service_account_email="default", quota_project_id=None):
         """
         Args:
             service_account_email (str): The service account email to use, or
                 'default'. A Compute Engine instance may have multiple service
                 accounts.
+            quota_project_id (Optional[str]): The project ID used for quota and
+                billing.
         """
         super(Credentials, self).__init__()
         self._service_account_email = service_account_email
+        self._quota_project_id = quota_project_id
 
     def _retrieve_info(self, request):
         """Retrieve information about the service account.
@@ -115,6 +118,13 @@
         """False: Compute Engine credentials can not be scoped."""
         return False
 
+    @_helpers.copy_docstring(credentials.Credentials)
+    def with_quota_project(self, quota_project_id):
+        return self.__class__(
+            service_account_email=self._service_account_email,
+            quota_project_id=quota_project_id,
+        )
+
 
 _DEFAULT_TOKEN_LIFETIME_SECS = 3600  # 1 hour in seconds
 _DEFAULT_TOKEN_URI = "https://www.googleapis.com/oauth2/v4/token"
@@ -143,6 +153,7 @@
         service_account_email=None,
         signer=None,
         use_metadata_identity_endpoint=False,
+        quota_project_id=None,
     ):
         """
         Args:
@@ -165,6 +176,8 @@
                 is False. If set to True, ``token_uri``, ``additional_claims``,
                 ``service_account_email``, ``signer`` argument should not be set;
                 otherwise ValueError will be raised.
+            quota_project_id (Optional[str]): The project ID used for quota and
+                billing.
 
         Raises:
             ValueError:
@@ -174,6 +187,7 @@
         """
         super(IDTokenCredentials, self).__init__()
 
+        self._quota_project_id = quota_project_id
         self._use_metadata_identity_endpoint = use_metadata_identity_endpoint
         self._target_audience = target_audience
 
@@ -226,6 +240,7 @@
                 None,
                 target_audience=target_audience,
                 use_metadata_identity_endpoint=True,
+                quota_project_id=self._quota_project_id,
             )
         else:
             return self.__class__(
@@ -236,6 +251,31 @@
                 additional_claims=self._additional_claims.copy(),
                 signer=self.signer,
                 use_metadata_identity_endpoint=False,
+                quota_project_id=self._quota_project_id,
+            )
+
+    @_helpers.copy_docstring(credentials.Credentials)
+    def with_quota_project(self, quota_project_id):
+
+        # since the signer is already instantiated,
+        # the request is not needed
+        if self._use_metadata_identity_endpoint:
+            return self.__class__(
+                None,
+                target_audience=self._target_audience,
+                use_metadata_identity_endpoint=True,
+                quota_project_id=quota_project_id,
+            )
+        else:
+            return self.__class__(
+                None,
+                service_account_email=self._service_account_email,
+                token_uri=self._token_uri,
+                target_audience=self._target_audience,
+                additional_claims=self._additional_claims.copy(),
+                signer=self.signer,
+                use_metadata_identity_endpoint=False,
+                quota_project_id=quota_project_id,
             )
 
     def _make_authorization_grant_assertion(self):
diff --git a/google/auth/credentials.py b/google/auth/credentials.py
index 3cc976b..3f389b1 100644
--- a/google/auth/credentials.py
+++ b/google/auth/credentials.py
@@ -49,6 +49,8 @@
         self.expiry = None
         """Optional[datetime]: When the token expires and is no longer valid.
         If this is None, the token is assumed to never expire."""
+        self._quota_project_id = None
+        """Optional[str]: Project to use for quota and billing purposes."""
 
     @property
     def expired(self):
@@ -75,6 +77,11 @@
         """
         return self.token is not None and not self.expired
 
+    @property
+    def quota_project_id(self):
+        """Project to use for quota and billing purposes."""
+        return self._quota_project_id
+
     @abc.abstractmethod
     def refresh(self, request):
         """Refreshes the access token.
@@ -102,6 +109,8 @@
         headers["authorization"] = "Bearer {}".format(
             _helpers.from_bytes(token or self.token)
         )
+        if self.quota_project_id:
+            headers["x-goog-user-project"] = self.quota_project_id
 
     def before_request(self, request, method, url, headers):
         """Performs credential-specific before request logic.
@@ -124,6 +133,18 @@
             self.refresh(request)
         self.apply(headers)
 
+    def with_quota_project(self, quota_project_id):
+        """Returns a copy of these credentials with a modified quota project
+
+        Args:
+            quota_project_id (str): The project to use for quota and
+                billing purposes
+
+        Returns:
+            google.oauth2.credentials.Credentials: A new credentials instance.
+        """
+        raise NotImplementedError("This class does not support quota project.")
+
 
 class AnonymousCredentials(Credentials):
     """Credentials that do not provide any authentication information.
@@ -161,6 +182,9 @@
     def before_request(self, request, method, url, headers):
         """Anonymous credentials do nothing to the request."""
 
+    def with_quota_project(self, quota_project_id):
+        raise ValueError("Anonymous credentials don't support quota project.")
+
 
 @six.add_metaclass(abc.ABCMeta)
 class ReadOnlyScoped(object):
diff --git a/google/auth/impersonated_credentials.py b/google/auth/impersonated_credentials.py
index 58e1bab..dbcb291 100644
--- a/google/auth/impersonated_credentials.py
+++ b/google/auth/impersonated_credentials.py
@@ -184,6 +184,7 @@
         target_scopes,
         delegates=None,
         lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
+        quota_project_id=None,
     ):
         """
         Args:
@@ -205,6 +206,9 @@
                 target_principal.
             lifetime (int): Number of seconds the delegated credential should
                 be valid for (upto 3600).
+            quota_project_id (Optional[str]): The project ID used for quota and billing.
+                This project may be different from the project used to
+                create the credentials.
         """
 
         super(Credentials, self).__init__()
@@ -221,6 +225,7 @@
         self._lifetime = lifetime
         self.token = None
         self.expiry = _helpers.utcnow()
+        self._quota_project_id = quota_project_id
 
     @_helpers.copy_docstring(credentials.Credentials)
     def refresh(self, request):
@@ -288,19 +293,38 @@
     def signer(self):
         return self
 
+    @_helpers.copy_docstring(credentials.Credentials)
+    def with_quota_project(self, quota_project_id):
+        return self.__class__(
+            self._source_credentials,
+            target_principal=self._target_principal,
+            target_scopes=self._target_scopes,
+            delegates=self._delegates,
+            lifetime=self._lifetime,
+            quota_project_id=quota_project_id,
+        )
+
 
 class IDTokenCredentials(credentials.Credentials):
     """Open ID Connect ID Token-based service account credentials.
 
     """
 
-    def __init__(self, target_credentials, target_audience=None, include_email=False):
+    def __init__(
+        self,
+        target_credentials,
+        target_audience=None,
+        include_email=False,
+        quota_project_id=None,
+    ):
         """
         Args:
             target_credentials (google.auth.Credentials): The target
                 credential used as to acquire the id tokens for.
             target_audience (string): Audience to issue the token for.
             include_email (bool): Include email in IdToken
+            quota_project_id (Optional[str]):  The project ID used for
+                quota and billing.
         """
         super(IDTokenCredentials, self).__init__()
 
@@ -311,15 +335,20 @@
         self._target_credentials = target_credentials
         self._target_audience = target_audience
         self._include_email = include_email
+        self._quota_project_id = quota_project_id
 
     def from_credentials(self, target_credentials, target_audience=None):
         return self.__class__(
-            target_credentials=self._target_credentials, target_audience=target_audience
+            target_credentials=self._target_credentials,
+            target_audience=target_audience,
+            quota_project_id=self._quota_project_id,
         )
 
     def with_target_audience(self, target_audience):
         return self.__class__(
-            target_credentials=self._target_credentials, target_audience=target_audience
+            target_credentials=self._target_credentials,
+            target_audience=target_audience,
+            quota_project_id=self._quota_project_id,
         )
 
     def with_include_email(self, include_email):
@@ -327,6 +356,16 @@
             target_credentials=self._target_credentials,
             target_audience=self._target_audience,
             include_email=include_email,
+            quota_project_id=self._quota_project_id,
+        )
+
+    @_helpers.copy_docstring(credentials.Credentials)
+    def with_quota_project(self, quota_project_id):
+        return self.__class__(
+            target_credentials=self._target_credentials,
+            target_audience=self._target_audience,
+            include_email=self._include_email,
+            quota_project_id=quota_project_id,
         )
 
     @_helpers.copy_docstring(credentials.Credentials)
diff --git a/google/auth/jwt.py b/google/auth/jwt.py
index 24b92eb..35ae034 100644
--- a/google/auth/jwt.py
+++ b/google/auth/jwt.py
@@ -346,6 +346,7 @@
         audience,
         additional_claims=None,
         token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
+        quota_project_id=None,
     ):
         """
         Args:
@@ -358,6 +359,8 @@
                 the JWT payload.
             token_lifetime (int): The amount of time in seconds for
                 which the token is valid. Defaults to 1 hour.
+            quota_project_id (Optional[str]): The project ID used for quota
+                and billing.
         """
         super(Credentials, self).__init__()
         self._signer = signer
@@ -365,6 +368,7 @@
         self._subject = subject
         self._audience = audience
         self._token_lifetime = token_lifetime
+        self._quota_project_id = quota_project_id
 
         if additional_claims is None:
             additional_claims = {}
@@ -486,6 +490,18 @@
             subject=subject if subject is not None else self._subject,
             audience=audience if audience is not None else self._audience,
             additional_claims=new_additional_claims,
+            quota_project_id=self._quota_project_id,
+        )
+
+    @_helpers.copy_docstring(google.auth.credentials.Credentials)
+    def with_quota_project(self, quota_project_id):
+        return self.__class__(
+            self._signer,
+            issuer=self._issuer,
+            subject=self._subject,
+            audience=self._audience,
+            additional_claims=self._additional_claims,
+            quota_project_id=quota_project_id,
         )
 
     def _make_jwt(self):
@@ -565,6 +581,7 @@
         additional_claims=None,
         token_lifetime=_DEFAULT_TOKEN_LIFETIME_SECS,
         max_cache_size=_DEFAULT_MAX_CACHE_SIZE,
+        quota_project_id=None,
     ):
         """
         Args:
@@ -577,12 +594,16 @@
                 which the token is valid. Defaults to 1 hour.
             max_cache_size (int): The maximum number of JWT tokens to keep in
                 cache. Tokens are cached using :class:`cachetools.LRUCache`.
+            quota_project_id (Optional[str]): The project ID used for quota
+                and billing.
+
         """
         super(OnDemandCredentials, self).__init__()
         self._signer = signer
         self._issuer = issuer
         self._subject = subject
         self._token_lifetime = token_lifetime
+        self._quota_project_id = quota_project_id
 
         if additional_claims is None:
             additional_claims = {}
@@ -697,6 +718,19 @@
             subject=subject if subject is not None else self._subject,
             additional_claims=new_additional_claims,
             max_cache_size=self._cache.maxsize,
+            quota_project_id=self._quota_project_id,
+        )
+
+    @_helpers.copy_docstring(google.auth.credentials.Credentials)
+    def with_quota_project(self, quota_project_id):
+
+        return self.__class__(
+            self._signer,
+            issuer=self._issuer,
+            subject=self._subject,
+            additional_claims=self._additional_claims,
+            max_cache_size=self._cache.maxsize,
+            quota_project_id=quota_project_id,
         )
 
     @property
diff --git a/google/oauth2/credentials.py b/google/oauth2/credentials.py
index 7572196..6f96275 100644
--- a/google/oauth2/credentials.py
+++ b/google/oauth2/credentials.py
@@ -156,26 +156,14 @@
         return self._client_secret
 
     @property
-    def quota_project_id(self):
-        """Optional[str]: The project to use for quota and billing purposes."""
-        return self._quota_project_id
-
-    @property
     def requires_scopes(self):
         """False: OAuth 2.0 credentials have their scopes set when
         the initial token is requested and can not be changed."""
         return False
 
+    @_helpers.copy_docstring(credentials.Credentials)
     def with_quota_project(self, quota_project_id):
-        """Returns a copy of these credentials with a modified quota project
 
-        Args:
-            quota_project_id (str): The project to use for quota and
-            billing purposes
-
-        Returns:
-            google.oauth2.credentials.Credentials: A new credentials instance.
-        """
         return self.__class__(
             self.token,
             refresh_token=self.refresh_token,
@@ -227,12 +215,6 @@
                     )
                 )
 
-    @_helpers.copy_docstring(credentials.Credentials)
-    def apply(self, headers, token=None):
-        super(Credentials, self).apply(headers, token=token)
-        if self.quota_project_id is not None:
-            headers["x-goog-user-project"] = self.quota_project_id
-
     @classmethod
     def from_authorized_user_info(cls, info, scopes=None):
         """Creates a Credentials instance from parsed authorized user info.
@@ -332,11 +314,15 @@
     Args:
         account (Optional[str]): Account to get the access token for. If not
             specified, the current active account will be used.
+        quota_project_id (Optional[str]): The project ID used for quota
+            and billing.
+
     """
 
-    def __init__(self, account=None):
+    def __init__(self, account=None, quota_project_id=None):
         super(UserAccessTokenCredentials, self).__init__()
         self._account = account
+        self._quota_project_id = quota_project_id
 
     def with_account(self, account):
         """Create a new instance with the given account.
@@ -348,7 +334,11 @@
             google.oauth2.credentials.UserAccessTokenCredentials: The created
                 credentials with the given account.
         """
-        return self.__class__(account=account)
+        return self.__class__(account=account, quota_project_id=self._quota_project_id)
+
+    @_helpers.copy_docstring(credentials.Credentials)
+    def with_quota_project(self, quota_project_id):
+        return self.__class__(account=self._account, quota_project_id=quota_project_id)
 
     def refresh(self, request):
         """Refreshes the access token.
diff --git a/google/oauth2/service_account.py b/google/oauth2/service_account.py
index 54630d3..2240631 100644
--- a/google/oauth2/service_account.py
+++ b/google/oauth2/service_account.py
@@ -238,11 +238,6 @@
         return self._project_id
 
     @property
-    def quota_project_id(self):
-        """Project ID to use for quota and billing purposes."""
-        return self._quota_project_id
-
-    @property
     def requires_scopes(self):
         """Checks if the credentials requires scopes.
 
@@ -311,17 +306,9 @@
             additional_claims=new_additional_claims,
         )
 
+    @_helpers.copy_docstring(credentials.Credentials)
     def with_quota_project(self, quota_project_id):
-        """Returns a copy of these credentials with a modified quota project.
 
-        Args:
-            quota_project_id (str): The project to use for quota and
-            billing purposes
-
-        Returns:
-            google.auth.service_account.Credentials: A new credentials
-                instance.
-        """
         return self.__class__(
             self._signer,
             service_account_email=self._service_account_email,
@@ -373,12 +360,6 @@
         self.token = access_token
         self.expiry = expiry
 
-    @_helpers.copy_docstring(credentials.Credentials)
-    def apply(self, headers, token=None):
-        super(Credentials, self).apply(headers, token=token)
-        if self.quota_project_id is not None:
-            headers["x-goog-user-project"] = self.quota_project_id
-
     @_helpers.copy_docstring(credentials.Signing)
     def sign_bytes(self, message):
         return self._signer.sign(message)
@@ -443,6 +424,7 @@
         token_uri,
         target_audience,
         additional_claims=None,
+        quota_project_id=None,
     ):
         """
         Args:
@@ -454,7 +436,7 @@
                 will be set to this string.
             additional_claims (Mapping[str, str]): Any additional claims for
                 the JWT assertion used in the authorization grant.
-
+            quota_project_id (Optional[str]): The project ID used for quota and billing.
         .. note:: Typically one of the helper constructors
             :meth:`from_service_account_file` or
             :meth:`from_service_account_info` are used instead of calling the
@@ -465,6 +447,7 @@
         self._service_account_email = service_account_email
         self._token_uri = token_uri
         self._target_audience = target_audience
+        self._quota_project_id = quota_project_id
 
         if additional_claims is not None:
             self._additional_claims = additional_claims
@@ -547,6 +530,18 @@
             token_uri=self._token_uri,
             target_audience=target_audience,
             additional_claims=self._additional_claims.copy(),
+            quota_project_id=self.quota_project_id,
+        )
+
+    @_helpers.copy_docstring(credentials.Credentials)
+    def with_quota_project(self, quota_project_id):
+        return self.__class__(
+            self._signer,
+            service_account_email=self._service_account_email,
+            token_uri=self._token_uri,
+            target_audience=self._target_audience,
+            additional_claims=self._additional_claims.copy(),
+            quota_project_id=quota_project_id,
         )
 
     def _make_authorization_grant_assertion(self):
diff --git a/tests/compute_engine/test_credentials.py b/tests/compute_engine/test_credentials.py
index 363dc9d..4ee6536 100644
--- a/tests/compute_engine/test_credentials.py
+++ b/tests/compute_engine/test_credentials.py
@@ -59,6 +59,8 @@
         assert not self.credentials.requires_scopes
         # Service account email hasn't been populated
         assert self.credentials.service_account_email == "default"
+        # No quota project
+        assert not self.credentials._quota_project_id
 
     @mock.patch(
         "google.auth._helpers.utcnow",
@@ -131,6 +133,11 @@
         # Credentials should now be valid.
         assert self.credentials.valid
 
+    def test_with_quota_project(self):
+        quota_project_creds = self.credentials.with_quota_project("project-foo")
+
+        assert quota_project_creds._quota_project_id == "project-foo"
+
 
 class TestIDTokenCredentials(object):
     credentials = None
@@ -154,6 +161,8 @@
         # Signer is initialized
         assert self.credentials.signer
         assert self.credentials.signer_email == "[email protected]"
+        # No quota project
+        assert not self.credentials._quota_project_id
 
     @mock.patch(
         "google.auth._helpers.utcnow",
@@ -394,6 +403,121 @@
     )
     @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
     @mock.patch("google.auth.iam.Signer.sign", autospec=True)
+    def test_with_quota_project(self, sign, get, utcnow):
+        get.side_effect = [
+            {"email": "[email protected]", "scopes": ["one", "two"]}
+        ]
+        sign.side_effect = [b"signature"]
+
+        request = mock.create_autospec(transport.Request, instance=True)
+        self.credentials = credentials.IDTokenCredentials(
+            request=request, target_audience="https://audience.com"
+        )
+        self.credentials = self.credentials.with_quota_project("project-foo")
+
+        assert self.credentials._quota_project_id == "project-foo"
+
+        # Generate authorization grant:
+        token = self.credentials._make_authorization_grant_assertion()
+        payload = jwt.decode(token, verify=False)
+
+        # The JWT token signature is 'signature' encoded in base 64:
+        assert token.endswith(b".c2lnbmF0dXJl")
+
+        # Check that the credentials have the token and proper expiration
+        assert payload == {
+            "aud": "https://www.googleapis.com/oauth2/v4/token",
+            "exp": 3600,
+            "iat": 0,
+            "iss": "[email protected]",
+            "target_audience": "https://audience.com",
+        }
+
+        # Check that the signer have been initialized with a Request object
+        assert isinstance(self.credentials._signer._request, transport.Request)
+
+    @responses.activate
+    def test_with_quota_project_integration(self):
+        """ Test that it is possible to refresh credentials
+        generated from `with_quota_project`.
+
+        Instead of mocking the methods, the HTTP responses
+        have been mocked.
+        """
+
+        # mock information about credentials
+        responses.add(
+            responses.GET,
+            "http://metadata.google.internal/computeMetadata/v1/instance/"
+            "service-accounts/default/?recursive=true",
+            status=200,
+            content_type="application/json",
+            json={
+                "scopes": "email",
+                "email": "[email protected]",
+                "aliases": ["default"],
+            },
+        )
+
+        # mock token for credentials
+        responses.add(
+            responses.GET,
+            "http://metadata.google.internal/computeMetadata/v1/instance/"
+            "service-accounts/[email protected]/token",
+            status=200,
+            content_type="application/json",
+            json={
+                "access_token": "some-token",
+                "expires_in": 3210,
+                "token_type": "Bearer",
+            },
+        )
+
+        # mock sign blob endpoint
+        signature = base64.b64encode(b"some-signature").decode("utf-8")
+        responses.add(
+            responses.POST,
+            "https://iamcredentials.googleapis.com/v1/projects/-/"
+            "serviceAccounts/[email protected]:signBlob?alt=json",
+            status=200,
+            content_type="application/json",
+            json={"keyId": "some-key-id", "signedBlob": signature},
+        )
+
+        id_token = "{}.{}.{}".format(
+            base64.b64encode(b'{"some":"some"}').decode("utf-8"),
+            base64.b64encode(b'{"exp": 3210}').decode("utf-8"),
+            base64.b64encode(b"token").decode("utf-8"),
+        )
+
+        # mock id token endpoint
+        responses.add(
+            responses.POST,
+            "https://www.googleapis.com/oauth2/v4/token",
+            status=200,
+            content_type="application/json",
+            json={"id_token": id_token, "expiry": 3210},
+        )
+
+        self.credentials = credentials.IDTokenCredentials(
+            request=requests.Request(),
+            service_account_email="[email protected]",
+            target_audience="https://audience.com",
+        )
+
+        self.credentials = self.credentials.with_quota_project("project-foo")
+
+        self.credentials.refresh(requests.Request())
+
+        assert self.credentials.token is not None
+        assert self.credentials._quota_project_id == "project-foo"
+
+    @mock.patch(
+        "google.auth._helpers.utcnow",
+        return_value=datetime.datetime.utcfromtimestamp(0),
+    )
+    @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
+    @mock.patch("google.auth.iam.Signer.sign", autospec=True)
     @mock.patch("google.oauth2._client.id_token_jwt_grant", autospec=True)
     def test_refresh_success(self, id_token_jwt_grant, sign, get, utcnow):
         get.side_effect = [
@@ -551,6 +675,23 @@
     @mock.patch(
         "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
     )
+    def test_id_token_with_quota_project(self, get_service_account_info):
+        get_service_account_info.return_value = {"email": "[email protected]"}
+
+        cred = credentials.IDTokenCredentials(
+            mock.Mock(), "audience", use_metadata_identity_endpoint=True
+        )
+        cred = cred.with_quota_project("project-foo")
+
+        assert cred._quota_project_id == "project-foo"
+        assert cred._use_metadata_identity_endpoint
+        assert cred._signer is None
+        assert cred._token_uri is None
+        assert cred._service_account_email == "[email protected]"
+
+    @mock.patch(
+        "google.auth.compute_engine._metadata.get_service_account_info", autospec=True
+    )
     @mock.patch("google.auth.compute_engine._metadata.get", autospec=True)
     def test_invalid_id_token_from_metadata(self, get, get_service_account_info):
         get.return_value = "invalid_id_token"
diff --git a/tests/oauth2/test_credentials.py b/tests/oauth2/test_credentials.py
index 78b1012..69d9fbc 100644
--- a/tests/oauth2/test_credentials.py
+++ b/tests/oauth2/test_credentials.py
@@ -454,6 +454,13 @@
         cred.refresh(None)
         assert cred.token == "access_token"
 
+    def test_with_quota_project(self):
+        cred = credentials.UserAccessTokenCredentials()
+        quota_project_cred = cred.with_quota_project("project-foo")
+
+        assert quota_project_cred._quota_project_id == "project-foo"
+        assert quota_project_cred._account == cred._account
+
     @mock.patch(
         "google.oauth2.credentials.UserAccessTokenCredentials.apply", autospec=True
     )
diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py
index 457d472..7f27dad 100644
--- a/tests/oauth2/test_service_account.py
+++ b/tests/oauth2/test_service_account.py
@@ -291,6 +291,11 @@
         new_credentials = credentials.with_target_audience("https://new.example.com")
         assert new_credentials._target_audience == "https://new.example.com"
 
+    def test_with_quota_project(self):
+        credentials = self.make_credentials()
+        new_credentials = credentials.with_quota_project("project-foo")
+        assert new_credentials._quota_project_id == "project-foo"
+
     def test__make_authorization_grant_assertion(self):
         credentials = self.make_credentials()
         token = credentials._make_authorization_grant_assertion()
diff --git a/tests/test__default.py b/tests/test__default.py
index 3c87b35..0665efa 100644
--- a/tests/test__default.py
+++ b/tests/test__default.py
@@ -21,6 +21,7 @@
 from google.auth import _default
 from google.auth import app_engine
 from google.auth import compute_engine
+from google.auth import credentials
 from google.auth import environment_vars
 from google.auth import exceptions
 from google.oauth2 import service_account
@@ -48,9 +49,12 @@
 with open(SERVICE_ACCOUNT_FILE) as fh:
     SERVICE_ACCOUNT_FILE_DATA = json.load(fh)
 
+MOCK_CREDENTIALS = mock.Mock(spec=credentials.Credentials)
+MOCK_CREDENTIALS.with_quota_project.return_value = MOCK_CREDENTIALS
+
 LOAD_FILE_PATCH = mock.patch(
     "google.auth._default.load_credentials_from_file",
-    return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
+    return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
     autospec=True,
 )
 
@@ -136,6 +140,16 @@
     assert credentials.scopes == ["https://www.google.com/calendar/feeds"]
 
 
+def test_load_credentials_from_file_authorized_user_cloud_sdk_with_quota_project():
+    credentials, project_id = _default.load_credentials_from_file(
+        AUTHORIZED_USER_CLOUD_SDK_FILE, quota_project_id="project-foo"
+    )
+
+    assert isinstance(credentials, google.oauth2.credentials.Credentials)
+    assert project_id is None
+    assert credentials.quota_project_id == "project-foo"
+
+
 def test_load_credentials_from_file_service_account():
     credentials, project_id = _default.load_credentials_from_file(SERVICE_ACCOUNT_FILE)
     assert isinstance(credentials, service_account.Credentials)
@@ -173,19 +187,19 @@
 
     credentials, project_id = _default._get_explicit_environ_credentials()
 
-    assert credentials is mock.sentinel.credentials
+    assert credentials is MOCK_CREDENTIALS
     assert project_id is mock.sentinel.project_id
     load.assert_called_with("filename")
 
 
 @LOAD_FILE_PATCH
 def test__get_explicit_environ_credentials_no_project_id(load, monkeypatch):
-    load.return_value = mock.sentinel.credentials, None
+    load.return_value = MOCK_CREDENTIALS, None
     monkeypatch.setenv(environment_vars.CREDENTIALS, "filename")
 
     credentials, project_id = _default._get_explicit_environ_credentials()
 
-    assert credentials is mock.sentinel.credentials
+    assert credentials is MOCK_CREDENTIALS
     assert project_id is None
 
 
@@ -198,7 +212,7 @@
 
     credentials, project_id = _default._get_gcloud_sdk_credentials()
 
-    assert credentials is mock.sentinel.credentials
+    assert credentials is MOCK_CREDENTIALS
     assert project_id is mock.sentinel.project_id
     load.assert_called_with(SERVICE_ACCOUNT_FILE)
 
@@ -226,11 +240,11 @@
 def test__get_gcloud_sdk_credentials_project_id(load, unused_isfile, get_project_id):
     # Don't return a project ID from load file, make the function check
     # the Cloud SDK project.
-    load.return_value = mock.sentinel.credentials, None
+    load.return_value = MOCK_CREDENTIALS, None
 
     credentials, project_id = _default._get_gcloud_sdk_credentials()
 
-    assert credentials == mock.sentinel.credentials
+    assert credentials == MOCK_CREDENTIALS
     assert project_id == mock.sentinel.project_id
     assert get_project_id.called
 
@@ -241,11 +255,11 @@
 def test__get_gcloud_sdk_credentials_no_project_id(load, unused_isfile, get_project_id):
     # Don't return a project ID from load file, make the function check
     # the Cloud SDK project.
-    load.return_value = mock.sentinel.credentials, None
+    load.return_value = MOCK_CREDENTIALS, None
 
     credentials, project_id = _default._get_gcloud_sdk_credentials()
 
-    assert credentials == mock.sentinel.credentials
+    assert credentials == MOCK_CREDENTIALS
     assert project_id is None
     assert get_project_id.called
 
@@ -351,58 +365,58 @@
 
 @mock.patch(
     "google.auth._default._get_explicit_environ_credentials",
-    return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
+    return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
     autospec=True,
 )
 def test_default_early_out(unused_get):
-    assert _default.default() == (mock.sentinel.credentials, mock.sentinel.project_id)
+    assert _default.default() == (MOCK_CREDENTIALS, mock.sentinel.project_id)
 
 
 @mock.patch(
     "google.auth._default._get_explicit_environ_credentials",
-    return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
+    return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
     autospec=True,
 )
 def test_default_explict_project_id(unused_get, monkeypatch):
     monkeypatch.setenv(environment_vars.PROJECT, "explicit-env")
-    assert _default.default() == (mock.sentinel.credentials, "explicit-env")
+    assert _default.default() == (MOCK_CREDENTIALS, "explicit-env")
 
 
 @mock.patch(
     "google.auth._default._get_explicit_environ_credentials",
-    return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
+    return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
     autospec=True,
 )
 def test_default_explict_legacy_project_id(unused_get, monkeypatch):
     monkeypatch.setenv(environment_vars.LEGACY_PROJECT, "explicit-env")
-    assert _default.default() == (mock.sentinel.credentials, "explicit-env")
+    assert _default.default() == (MOCK_CREDENTIALS, "explicit-env")
 
 
 @mock.patch("logging.Logger.warning", autospec=True)
 @mock.patch(
     "google.auth._default._get_explicit_environ_credentials",
-    return_value=(mock.sentinel.credentials, None),
+    return_value=(MOCK_CREDENTIALS, None),
     autospec=True,
 )
 @mock.patch(
     "google.auth._default._get_gcloud_sdk_credentials",
-    return_value=(mock.sentinel.credentials, None),
+    return_value=(MOCK_CREDENTIALS, None),
     autospec=True,
 )
 @mock.patch(
     "google.auth._default._get_gae_credentials",
-    return_value=(mock.sentinel.credentials, None),
+    return_value=(MOCK_CREDENTIALS, None),
     autospec=True,
 )
 @mock.patch(
     "google.auth._default._get_gce_credentials",
-    return_value=(mock.sentinel.credentials, None),
+    return_value=(MOCK_CREDENTIALS, None),
     autospec=True,
 )
 def test_default_without_project_id(
     unused_gce, unused_gae, unused_sdk, unused_explicit, logger_warning
 ):
-    assert _default.default() == (mock.sentinel.credentials, None)
+    assert _default.default() == (MOCK_CREDENTIALS, None)
     logger_warning.assert_called_with(mock.ANY, mock.ANY, mock.ANY)
 
 
@@ -433,10 +447,14 @@
 
 @mock.patch(
     "google.auth._default._get_explicit_environ_credentials",
-    return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
+    return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
     autospec=True,
 )
[email protected]("google.auth.credentials.with_scopes_if_required", autospec=True)
[email protected](
+    "google.auth.credentials.with_scopes_if_required",
+    return_value=MOCK_CREDENTIALS,
+    autospec=True,
+)
 def test_default_scoped(with_scopes, unused_get):
     scopes = ["one", "two"]
 
@@ -444,12 +462,12 @@
 
     assert credentials == with_scopes.return_value
     assert project_id == mock.sentinel.project_id
-    with_scopes.assert_called_once_with(mock.sentinel.credentials, scopes)
+    with_scopes.assert_called_once_with(MOCK_CREDENTIALS, scopes)
 
 
 @mock.patch(
     "google.auth._default._get_explicit_environ_credentials",
-    return_value=(mock.sentinel.credentials, mock.sentinel.project_id),
+    return_value=(MOCK_CREDENTIALS, mock.sentinel.project_id),
     autospec=True,
 )
 def test_default_no_app_engine_compute_engine_module(unused_get):
@@ -463,7 +481,4 @@
     with mock.patch.dict("sys.modules"):
         sys.modules["google.auth.compute_engine"] = None
         sys.modules["google.auth.app_engine"] = None
-        assert _default.default() == (
-            mock.sentinel.credentials,
-            mock.sentinel.project_id,
-        )
+        assert _default.default() == (MOCK_CREDENTIALS, mock.sentinel.project_id)
diff --git a/tests/test_app_engine.py b/tests/test_app_engine.py
index 9dfdfa6..846d314 100644
--- a/tests/test_app_engine.py
+++ b/tests/test_app_engine.py
@@ -102,6 +102,7 @@
         # Scopes are required
         assert not credentials.scopes
         assert credentials.requires_scopes
+        assert not credentials.quota_project_id
 
     def test_with_scopes(self, app_identity):
         credentials = app_engine.Credentials()
@@ -114,6 +115,16 @@
         assert scoped_credentials.has_scopes(["email"])
         assert not scoped_credentials.requires_scopes
 
+    def test_with_quota_project(self, app_identity):
+        credentials = app_engine.Credentials()
+
+        assert not credentials.scopes
+        assert not credentials.quota_project_id
+
+        quota_project_creds = credentials.with_quota_project("project-foo")
+
+        assert quota_project_creds.quota_project_id == "project-foo"
+
     def test_service_account_email_implicit(self, app_identity):
         app_identity.get_service_account_name.return_value = (
             mock.sentinel.service_account_email
diff --git a/tests/test_credentials.py b/tests/test_credentials.py
index 16ddd9b..2023fac 100644
--- a/tests/test_credentials.py
+++ b/tests/test_credentials.py
@@ -24,6 +24,9 @@
     def refresh(self, request):
         self.token = request
 
+    def with_quota_project(self, quota_project_id):
+        raise NotImplementedError()
+
 
 def test_credentials_constructor():
     credentials = CredentialsImpl()
@@ -112,6 +115,12 @@
     assert headers == {}
 
 
+def test_anonymous_credentials_with_quota_project():
+    with pytest.raises(ValueError):
+        anon = credentials.AnonymousCredentials()
+        anon.with_quota_project("project-foo")
+
+
 class ReadOnlyScopedCredentialsImpl(credentials.ReadOnlyScoped, CredentialsImpl):
     @property
     def requires_scopes(self):
diff --git a/tests/test_iam.py b/tests/test_iam.py
index ea7d08a..e20eeba 100644
--- a/tests/test_iam.py
+++ b/tests/test_iam.py
@@ -50,6 +50,9 @@
         def refresh(self, request):
             pass
 
+        def with_quota_project(self, quota_project_id):
+            raise NotImplementedError()
+
     return CredentialsImpl()
 
 
diff --git a/tests/test_impersonated_credentials.py b/tests/test_impersonated_credentials.py
index e0b5b11..46850a0 100644
--- a/tests/test_impersonated_credentials.py
+++ b/tests/test_impersonated_credentials.py
@@ -311,6 +311,12 @@
         signature = credentials.sign_bytes(b"signed bytes")
         assert signature == b"signature"
 
+    def test_with_quota_project(self):
+        credentials = self.make_credentials()
+
+        quota_project_creds = credentials.with_quota_project("project-foo")
+        assert quota_project_creds._quota_project_id == "project-foo"
+
     def test_id_token_success(
         self, mock_donor_credentials, mock_authorizedsession_idtoken
     ):
@@ -435,3 +441,32 @@
         id_creds.refresh(request)
 
         assert id_creds.token == ID_TOKEN_DATA
+
+    def test_id_token_with_quota_project(
+        self, mock_donor_credentials, mock_authorizedsession_idtoken
+    ):
+        credentials = self.make_credentials(lifetime=None)
+        token = "token"
+        target_audience = "https://foo.bar"
+
+        expire_time = (
+            _helpers.utcnow().replace(microsecond=0) + datetime.timedelta(seconds=500)
+        ).isoformat("T") + "Z"
+        response_body = {"accessToken": token, "expireTime": expire_time}
+
+        request = self.make_request(
+            data=json.dumps(response_body), status=http_client.OK
+        )
+
+        credentials.refresh(request)
+
+        assert credentials.valid
+        assert not credentials.expired
+
+        id_creds = impersonated_credentials.IDTokenCredentials(
+            credentials, target_audience=target_audience
+        )
+        id_creds = id_creds.with_quota_project("project-foo")
+        id_creds.refresh(request)
+
+        assert id_creds.quota_project_id == "project-foo"
diff --git a/tests/test_jwt.py b/tests/test_jwt.py
index 488aee4..7aa031e 100644
--- a/tests/test_jwt.py
+++ b/tests/test_jwt.py
@@ -363,6 +363,18 @@
         assert new_credentials._subject == self.credentials._subject
         assert new_credentials._audience == new_audience
         assert new_credentials._additional_claims == self.credentials._additional_claims
+        assert new_credentials._quota_project_id == self.credentials._quota_project_id
+
+    def test_with_quota_project(self):
+        quota_project_id = "project-foo"
+
+        new_credentials = self.credentials.with_quota_project(quota_project_id)
+        assert new_credentials._signer == self.credentials._signer
+        assert new_credentials._issuer == self.credentials._issuer
+        assert new_credentials._subject == self.credentials._subject
+        assert new_credentials._audience == self.credentials._audience
+        assert new_credentials._additional_claims == self.credentials._additional_claims
+        assert new_credentials._quota_project_id == quota_project_id
 
     def test_sign_bytes(self):
         to_sign = b"123"
@@ -507,6 +519,16 @@
         assert new_credentials._subject == self.credentials._subject
         assert new_credentials._additional_claims == new_claims
 
+    def test_with_quota_project(self):
+        quota_project_id = "project-foo"
+        new_credentials = self.credentials.with_quota_project(quota_project_id)
+
+        assert new_credentials._signer == self.credentials._signer
+        assert new_credentials._issuer == self.credentials._issuer
+        assert new_credentials._subject == self.credentials._subject
+        assert new_credentials._additional_claims == self.credentials._additional_claims
+        assert new_credentials._quota_project_id == quota_project_id
+
     def test_sign_bytes(self):
         to_sign = b"123"
         signature = self.credentials.sign_bytes(to_sign)
diff --git a/tests/transport/test_grpc.py b/tests/transport/test_grpc.py
index c3da76d..ef2e2e2 100644
--- a/tests/transport/test_grpc.py
+++ b/tests/transport/test_grpc.py
@@ -52,6 +52,9 @@
     def refresh(self, request):
         self.token += "1"
 
+    def with_quota_project(self, quota_project_id):
+        raise NotImplementedError()
+
 
 class TestAuthMetadataPlugin(object):
     def test_call_no_refresh(self):
diff --git a/tests/transport/test_requests.py b/tests/transport/test_requests.py
index 77e1527..7ac55ce 100644
--- a/tests/transport/test_requests.py
+++ b/tests/transport/test_requests.py
@@ -109,6 +109,9 @@
     def refresh(self, request):
         self.token += "1"
 
+    def with_quota_project(self, quota_project_id):
+        raise NotImplementedError()
+
 
 class TimeTickCredentialsStub(CredentialsStub):
     """Credentials that spend some (mocked) time when refreshing a token."""
diff --git a/tests/transport/test_urllib3.py b/tests/transport/test_urllib3.py
index 1a1c0a1..3158b92 100644
--- a/tests/transport/test_urllib3.py
+++ b/tests/transport/test_urllib3.py
@@ -65,6 +65,9 @@
     def refresh(self, request):
         self.token += "1"
 
+    def with_quota_project(self, quota_project_id):
+        raise NotImplementedError()
+
 
 class HttpStub(object):
     def __init__(self, responses, headers=None):