feat: use self-signed jwt for service account (#665)

diff --git a/google/auth/_default.py b/google/auth/_default.py
index 4377893..3b8c281 100644
--- a/google/auth/_default.py
+++ b/google/auth/_default.py
@@ -69,7 +69,9 @@
         warnings.warn(_CLOUD_SDK_CREDENTIALS_WARNING)
 
 
-def load_credentials_from_file(filename, scopes=None, quota_project_id=None):
+def load_credentials_from_file(
+    filename, scopes=None, default_scopes=None, quota_project_id=None
+):
     """Loads Google credentials from a file.
 
     The credentials file must be a service account key or stored authorized
@@ -80,6 +82,8 @@
         scopes (Optional[Sequence[str]]): The list of scopes for the credentials. If
             specified, the credentials will automatically be scoped if
             necessary
+        default_scopes (Optional[Sequence[str]]): Default scopes passed by a
+            Google client library. Use 'scopes' for user-defined scopes.
         quota_project_id (Optional[str]):  The project ID used for
                 quota and billing.
 
@@ -132,7 +136,7 @@
 
         try:
             credentials = service_account.Credentials.from_service_account_info(
-                info, scopes=scopes
+                info, scopes=scopes, default_scopes=default_scopes
             )
         except ValueError as caught_exc:
             msg = "Failed to load service account credentials from {}".format(filename)
@@ -248,7 +252,7 @@
         return None, None
 
 
-def default(scopes=None, request=None, quota_project_id=None):
+def default(scopes=None, request=None, quota_project_id=None, default_scopes=None):
     """Gets the default credentials for the current environment.
 
     `Application Default Credentials`_ provides an easy way to obtain
@@ -312,6 +316,8 @@
             use the standard library http client to make requests.
         quota_project_id (Optional[str]):  The project ID used for
             quota and billing.
+        default_scopes (Optional[Sequence[str]]): Default scopes passed by a
+            Google client library. Use 'scopes' for user-defined scopes.
     Returns:
         Tuple[~google.auth.credentials.Credentials, Optional[str]]:
             the current environment's credentials and project ID. Project ID
@@ -339,7 +345,9 @@
     for checker in checkers:
         credentials, project_id = checker()
         if credentials is not None:
-            credentials = with_scopes_if_required(credentials, scopes)
+            credentials = with_scopes_if_required(
+                credentials, scopes, default_scopes=default_scopes
+            )
             if quota_project_id:
                 credentials = credentials.with_quota_project(quota_project_id)
 
diff --git a/google/auth/app_engine.py b/google/auth/app_engine.py
index f1d2128..81aef73 100644
--- a/google/auth/app_engine.py
+++ b/google/auth/app_engine.py
@@ -86,11 +86,19 @@
     tokens.
     """
 
-    def __init__(self, scopes=None, service_account_id=None, quota_project_id=None):
+    def __init__(
+        self,
+        scopes=None,
+        default_scopes=None,
+        service_account_id=None,
+        quota_project_id=None,
+    ):
         """
         Args:
             scopes (Sequence[str]): Scopes to request from the App Identity
                 API.
+            default_scopes (Sequence[str]): Default scopes passed by a
+                Google client library. Use 'scopes' for user-defined scopes.
             service_account_id (str): The service account ID passed into
                 :func:`google.appengine.api.app_identity.get_access_token`.
                 If not specified, the default application service account
@@ -109,16 +117,16 @@
 
         super(Credentials, self).__init__()
         self._scopes = scopes
+        self._default_scopes = default_scopes
         self._service_account_id = service_account_id
         self._signer = Signer()
         self._quota_project_id = quota_project_id
 
     @_helpers.copy_docstring(credentials.Credentials)
     def refresh(self, request):
+        scopes = self._scopes if self._scopes is not None else self._default_scopes
         # pylint: disable=unused-argument
-        token, ttl = app_identity.get_access_token(
-            self._scopes, self._service_account_id
-        )
+        token, ttl = app_identity.get_access_token(scopes, self._service_account_id)
         expiry = datetime.datetime.utcfromtimestamp(ttl)
 
         self.token, self.expiry = token, expiry
@@ -137,12 +145,13 @@
         Returns:
             bool: True if there are no scopes set otherwise False.
         """
-        return not self._scopes
+        return not self._scopes and not self._default_scopes
 
     @_helpers.copy_docstring(credentials.Scoped)
-    def with_scopes(self, scopes):
+    def with_scopes(self, scopes, default_scopes=None):
         return self.__class__(
             scopes=scopes,
+            default_scopes=default_scopes,
             service_account_id=self._service_account_id,
             quota_project_id=self.quota_project_id,
         )
diff --git a/google/auth/compute_engine/credentials.py b/google/auth/compute_engine/credentials.py
index 2906310..1671656 100644
--- a/google/auth/compute_engine/credentials.py
+++ b/google/auth/compute_engine/credentials.py
@@ -52,7 +52,11 @@
     """
 
     def __init__(
-        self, service_account_email="default", quota_project_id=None, scopes=None
+        self,
+        service_account_email="default",
+        quota_project_id=None,
+        scopes=None,
+        default_scopes=None,
     ):
         """
         Args:
@@ -61,11 +65,15 @@
                 accounts.
             quota_project_id (Optional[str]): The project ID used for quota and
                 billing.
+            scopes (Optional[Sequence[str]]): The list of scopes for the credentials.
+            default_scopes (Optional[Sequence[str]]): Default scopes passed by a
+                Google client library. Use 'scopes' for user-defined scopes.
         """
         super(Credentials, self).__init__()
         self._service_account_email = service_account_email
         self._quota_project_id = quota_project_id
         self._scopes = scopes
+        self._default_scopes = default_scopes
 
     def _retrieve_info(self, request):
         """Retrieve information about the service account.
@@ -98,12 +106,11 @@
                 service can't be reached if if the instance has not
                 credentials.
         """
+        scopes = self._scopes if self._scopes is not None else self._default_scopes
         try:
             self._retrieve_info(request)
             self.token, self.expiry = _metadata.get_service_account_token(
-                request,
-                service_account=self._service_account_email,
-                scopes=self._scopes,
+                request, service_account=self._service_account_email, scopes=scopes
             )
         except exceptions.TransportError as caught_exc:
             new_exc = exceptions.RefreshError(caught_exc)
@@ -131,12 +138,13 @@
         )
 
     @_helpers.copy_docstring(credentials.Scoped)
-    def with_scopes(self, scopes):
+    def with_scopes(self, scopes, default_scopes=None):
         # Compute Engine credentials can not be scoped (the metadata service
         # ignores the scopes parameter). App Engine, Cloud Run and Flex support
         # requesting scopes.
         return self.__class__(
             scopes=scopes,
+            default_scopes=default_scopes,
             service_account_email=self._service_account_email,
             quota_project_id=self._quota_project_id,
         )
diff --git a/google/auth/credentials.py b/google/auth/credentials.py
index 02082ca..7d3c798 100644
--- a/google/auth/credentials.py
+++ b/google/auth/credentials.py
@@ -220,12 +220,18 @@
     def __init__(self):
         super(ReadOnlyScoped, self).__init__()
         self._scopes = None
+        self._default_scopes = None
 
     @property
     def scopes(self):
         """Sequence[str]: the credentials' current set of scopes."""
         return self._scopes
 
+    @property
+    def default_scopes(self):
+        """Sequence[str]: the credentials' current set of default scopes."""
+        return self._default_scopes
+
     @abc.abstractproperty
     def requires_scopes(self):
         """True if these credentials require scopes to obtain an access token.
@@ -244,7 +250,10 @@
         Returns:
             bool: True if the credentials have the given scopes.
         """
-        return set(scopes).issubset(set(self._scopes or []))
+        credential_scopes = (
+            self._scopes if self._scopes is not None else self._default_scopes
+        )
+        return set(scopes).issubset(set(credential_scopes or []))
 
 
 class Scoped(ReadOnlyScoped):
@@ -277,7 +286,7 @@
     """
 
     @abc.abstractmethod
-    def with_scopes(self, scopes):
+    def with_scopes(self, scopes, default_scopes=None):
         """Create a copy of these credentials with the specified scopes.
 
         Args:
@@ -292,7 +301,7 @@
         raise NotImplementedError("This class does not require scoping.")
 
 
-def with_scopes_if_required(credentials, scopes):
+def with_scopes_if_required(credentials, scopes, default_scopes=None):
     """Creates a copy of the credentials with scopes if scoping is required.
 
     This helper function is useful when you do not know (or care to know) the
@@ -306,6 +315,8 @@
         credentials (google.auth.credentials.Credentials): The credentials to
             scope if necessary.
         scopes (Sequence[str]): The list of scopes to use.
+        default_scopes (Sequence[str]): Default scopes passed by a
+            Google client library. Use 'scopes' for user-defined scopes.
 
     Returns:
         google.auth.credentials.Credentials: Either a new set of scoped
@@ -313,7 +324,7 @@
             was required.
     """
     if isinstance(credentials, Scoped) and credentials.requires_scopes:
-        return credentials.with_scopes(scopes)
+        return credentials.with_scopes(scopes, default_scopes=default_scopes)
     else:
         return credentials
 
diff --git a/google/auth/transport/grpc.py b/google/auth/transport/grpc.py
index ab7d0db..04c0f4f 100644
--- a/google/auth/transport/grpc.py
+++ b/google/auth/transport/grpc.py
@@ -24,6 +24,7 @@
 from google.auth import environment_vars
 from google.auth import exceptions
 from google.auth.transport import _mtls_helper
+from google.oauth2 import service_account
 
 try:
     import grpc
@@ -51,15 +52,19 @@
             add to requests.
         request (google.auth.transport.Request): A HTTP transport request
             object used to refresh credentials as needed.
+        default_host (Optional[str]): A host like "pubsub.googleapis.com".
+            This is used when a self-signed JWT is created from service
+            account credentials.
     """
 
-    def __init__(self, credentials, request):
+    def __init__(self, credentials, request, default_host=None):
         # pylint: disable=no-value-for-parameter
         # pylint doesn't realize that the super method takes no arguments
         # because this class is the same name as the superclass.
         super(AuthMetadataPlugin, self).__init__()
         self._credentials = credentials
         self._request = request
+        self._default_host = default_host
 
     def _get_authorization_headers(self, context):
         """Gets the authorization headers for a request.
@@ -69,6 +74,19 @@
                 to add to the request.
         """
         headers = {}
+
+        # https://google.aip.dev/auth/4111
+        # Attempt to use self-signed JWTs when a service account is used.
+        # A default host must be explicitly provided since it cannot always
+        # be determined from the context.service_url.
+        if (
+            isinstance(self._credentials, service_account.Credentials)
+            and self._default_host
+        ):
+            self._credentials._create_self_signed_jwt(
+                "https://{}/".format(self._default_host)
+            )
+
         self._credentials.before_request(
             self._request, context.method_name, context.service_url, headers
         )
diff --git a/google/oauth2/credentials.py b/google/oauth2/credentials.py
index 36b8f0c..464cc48 100644
--- a/google/oauth2/credentials.py
+++ b/google/oauth2/credentials.py
@@ -66,6 +66,7 @@
         client_id=None,
         client_secret=None,
         scopes=None,
+        default_scopes=None,
         quota_project_id=None,
         expiry=None,
     ):
@@ -91,6 +92,8 @@
                 token if refresh information is provided (e.g. The refresh
                 token scopes are a superset of this or contain a wild card
                 scope like 'https://www.googleapis.com/auth/any-api').
+            default_scopes (Sequence[str]): Default scopes passed by a
+                Google client library. Use 'scopes' for user-defined scopes.
             quota_project_id (Optional[str]): The project ID used for quota and billing.
                 This project may be different from the project used to
                 create the credentials.
@@ -101,6 +104,7 @@
         self._refresh_token = refresh_token
         self._id_token = id_token
         self._scopes = scopes
+        self._default_scopes = default_scopes
         self._token_uri = token_uri
         self._client_id = client_id
         self._client_secret = client_secret
@@ -121,6 +125,7 @@
         self._refresh_token = d.get("_refresh_token")
         self._id_token = d.get("_id_token")
         self._scopes = d.get("_scopes")
+        self._default_scopes = d.get("_default_scopes")
         self._token_uri = d.get("_token_uri")
         self._client_id = d.get("_client_id")
         self._client_secret = d.get("_client_secret")
@@ -180,6 +185,7 @@
             client_id=self.client_id,
             client_secret=self.client_secret,
             scopes=self.scopes,
+            default_scopes=self.default_scopes,
             quota_project_id=quota_project_id,
         )
 
@@ -197,13 +203,15 @@
                 "token_uri, client_id, and client_secret."
             )
 
+        scopes = self._scopes if self._scopes is not None else self._default_scopes
+
         access_token, refresh_token, expiry, grant_response = _client.refresh_grant(
             request,
             self._token_uri,
             self._refresh_token,
             self._client_id,
             self._client_secret,
-            self._scopes,
+            scopes,
         )
 
         self.token = access_token
@@ -211,8 +219,8 @@
         self._refresh_token = refresh_token
         self._id_token = grant_response.get("id_token")
 
-        if self._scopes and "scopes" in grant_response:
-            requested_scopes = frozenset(self._scopes)
+        if scopes and "scopes" in grant_response:
+            requested_scopes = frozenset(scopes)
             granted_scopes = frozenset(grant_response["scopes"].split())
             scopes_requested_but_not_granted = requested_scopes - granted_scopes
             if scopes_requested_but_not_granted:
diff --git a/google/oauth2/service_account.py b/google/oauth2/service_account.py
index c4898a2..ed91011 100644
--- a/google/oauth2/service_account.py
+++ b/google/oauth2/service_account.py
@@ -126,6 +126,7 @@
         service_account_email,
         token_uri,
         scopes=None,
+        default_scopes=None,
         subject=None,
         project_id=None,
         quota_project_id=None,
@@ -135,8 +136,10 @@
         Args:
             signer (google.auth.crypt.Signer): The signer used to sign JWTs.
             service_account_email (str): The service account's email.
-            scopes (Sequence[str]): Scopes to request during the authorization
-                grant.
+            scopes (Sequence[str]): User-defined scopes to request during the
+                authorization grant.
+            default_scopes (Sequence[str]): Default scopes passed by a
+                Google client library. Use 'scopes' for user-defined scopes.
             token_uri (str): The OAuth 2.0 Token URI.
             subject (str): For domain-wide delegation, the email address of the
                 user to for which to request delegated access.
@@ -155,6 +158,7 @@
         super(Credentials, self).__init__()
 
         self._scopes = scopes
+        self._default_scopes = default_scopes
         self._signer = signer
         self._service_account_email = service_account_email
         self._subject = subject
@@ -162,6 +166,8 @@
         self._quota_project_id = quota_project_id
         self._token_uri = token_uri
 
+        self._jwt_credentials = None
+
         if additional_claims is not None:
             self._additional_claims = additional_claims
         else:
@@ -249,11 +255,12 @@
         return True if not self._scopes else False
 
     @_helpers.copy_docstring(credentials.Scoped)
-    def with_scopes(self, scopes):
+    def with_scopes(self, scopes, default_scopes=None):
         return self.__class__(
             self._signer,
             service_account_email=self._service_account_email,
             scopes=scopes,
+            default_scopes=default_scopes,
             token_uri=self._token_uri,
             subject=self._subject,
             project_id=self._project_id,
@@ -275,6 +282,7 @@
             self._signer,
             service_account_email=self._service_account_email,
             scopes=self._scopes,
+            default_scopes=self._default_scopes,
             token_uri=self._token_uri,
             subject=subject,
             project_id=self._project_id,
@@ -301,6 +309,7 @@
             self._signer,
             service_account_email=self._service_account_email,
             scopes=self._scopes,
+            default_scopes=self._default_scopes,
             token_uri=self._token_uri,
             subject=self._subject,
             project_id=self._project_id,
@@ -314,6 +323,7 @@
         return self.__class__(
             self._signer,
             service_account_email=self._service_account_email,
+            default_scopes=self._default_scopes,
             scopes=self._scopes,
             token_uri=self._token_uri,
             subject=self._subject,
@@ -357,10 +367,30 @@
 
     @_helpers.copy_docstring(credentials.Credentials)
     def refresh(self, request):
-        assertion = self._make_authorization_grant_assertion()
-        access_token, expiry, _ = _client.jwt_grant(request, self._token_uri, assertion)
-        self.token = access_token
-        self.expiry = expiry
+        if self._jwt_credentials is not None:
+            self._jwt_credentials.refresh(request)
+            self.token = self._jwt_credentials.token
+            self.expiry = self._jwt_credentials.expiry
+        else:
+            assertion = self._make_authorization_grant_assertion()
+            access_token, expiry, _ = _client.jwt_grant(
+                request, self._token_uri, assertion
+            )
+            self.token = access_token
+            self.expiry = expiry
+
+    def _create_self_signed_jwt(self, audience):
+        """Create a self-signed JWT from the credentials if requirements are met.
+
+        Args:
+            audience (str): The service URL. ``https://[API_ENDPOINT]/``
+        """
+        # https://google.aip.dev/auth/4111
+        # If the user has not defined scopes, create a self-signed jwt
+        if not self.scopes:
+            self._jwt_credentials = jwt.Credentials.from_signing_credentials(
+                self, audience
+            )
 
     @_helpers.copy_docstring(credentials.Signing)
     def sign_bytes(self, message):
diff --git a/system_tests/system_tests_sync/test_grpc.py b/system_tests/system_tests_sync/test_grpc.py
index 7dcbd4c..da2eb71 100644
--- a/system_tests/system_tests_sync/test_grpc.py
+++ b/system_tests/system_tests_sync/test_grpc.py
@@ -16,15 +16,18 @@
 import google.auth.credentials
 import google.auth.jwt
 import google.auth.transport.grpc
+from google.oauth2 import service_account
+
 from google.cloud import pubsub_v1
 
 
 def test_grpc_request_with_regular_credentials(http_request):
     credentials, project_id = google.auth.default()
     credentials = google.auth.credentials.with_scopes_if_required(
-        credentials, ["https://www.googleapis.com/auth/pubsub"]
+        credentials, scopes=["https://www.googleapis.com/auth/pubsub"]
     )
 
+
     # Create a pub/sub client.
     client = pubsub_v1.PublisherClient(credentials=credentials)
 
@@ -34,6 +37,30 @@
     list(list_topics_iter)
 
 
+def test_grpc_request_with_regular_credentials_and_self_signed_jwt(http_request):
+    credentials, project_id = google.auth.default()
+
+    # At the time this test is being written, there are no GAPIC libraries
+    # that will trigger the self-signed JWT flow. Manually create the self-signed
+    # jwt on the service account credential to check that the request
+    # succeeds.
+    credentials = credentials.with_scopes(
+        scopes=[], default_scopes=["https://www.googleapis.com/auth/pubsub"]
+    )
+    credentials._create_self_signed_jwt(audience="https://pubsub.googleapis.com/")
+
+    # Create a pub/sub client.
+    client = pubsub_v1.PublisherClient(credentials=credentials)
+
+    # list the topics and drain the iterator to test that an authorized API
+    # call works.
+    list_topics_iter = client.list_topics(project="projects/{}".format(project_id))
+    list(list_topics_iter)
+    
+    # Check that self-signed JWT was created
+    assert credentials._jwt_credentials is not None
+
+
 def test_grpc_request_with_jwt_credentials():
     credentials, project_id = google.auth.default()
     audience = "https://pubsub.googleapis.com/google.pubsub.v1.Publisher"
diff --git a/tests/compute_engine/test__metadata.py b/tests/compute_engine/test__metadata.py
index d053372..852822d 100644
--- a/tests/compute_engine/test__metadata.py
+++ b/tests/compute_engine/test__metadata.py
@@ -318,6 +318,44 @@
     assert expiry == utcnow() + datetime.timedelta(seconds=ttl)
 
 
[email protected]("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test_get_service_account_token_with_scopes_list(utcnow):
+    ttl = 500
+    request = make_request(
+        json.dumps({"access_token": "token", "expires_in": ttl}),
+        headers={"content-type": "application/json"},
+    )
+
+    token, expiry = _metadata.get_service_account_token(request, scopes=["foo", "bar"])
+
+    request.assert_called_once_with(
+        method="GET",
+        url=_metadata._METADATA_ROOT + PATH + "/token" + "?scopes=foo%2Cbar",
+        headers=_metadata._METADATA_HEADERS,
+    )
+    assert token == "token"
+    assert expiry == utcnow() + datetime.timedelta(seconds=ttl)
+
+
[email protected]("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test_get_service_account_token_with_scopes_string(utcnow):
+    ttl = 500
+    request = make_request(
+        json.dumps({"access_token": "token", "expires_in": ttl}),
+        headers={"content-type": "application/json"},
+    )
+
+    token, expiry = _metadata.get_service_account_token(request, scopes="foo,bar")
+
+    request.assert_called_once_with(
+        method="GET",
+        url=_metadata._METADATA_ROOT + PATH + "/token" + "?scopes=foo%2Cbar",
+        headers=_metadata._METADATA_HEADERS,
+    )
+    assert token == "token"
+    assert expiry == utcnow() + datetime.timedelta(seconds=ttl)
+
+
 def test_get_service_account_info():
     key, value = "foo", "bar"
     request = make_request(
diff --git a/tests/oauth2/test_credentials.py b/tests/oauth2/test_credentials.py
index ee8b8a2..b885d29 100644
--- a/tests/oauth2/test_credentials.py
+++ b/tests/oauth2/test_credentials.py
@@ -127,6 +127,7 @@
         self, unused_utcnow, refresh_grant
     ):
         scopes = ["email", "profile"]
+        default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
         token = "token"
         expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
         grant_response = {"id_token": mock.sentinel.id_token}
@@ -149,6 +150,7 @@
             client_id=self.CLIENT_ID,
             client_secret=self.CLIENT_SECRET,
             scopes=scopes,
+            default_scopes=default_scopes,
         )
 
         # Refresh credentials
@@ -179,6 +181,62 @@
         "google.auth._helpers.utcnow",
         return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
     )
+    def test_credentials_with_only_default_scopes_requested(
+        self, unused_utcnow, refresh_grant
+    ):
+        default_scopes = ["email", "profile"]
+        token = "token"
+        expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
+        grant_response = {"id_token": mock.sentinel.id_token}
+        refresh_grant.return_value = (
+            # Access token
+            token,
+            # New refresh token
+            None,
+            # Expiry,
+            expiry,
+            # Extra data
+            grant_response,
+        )
+
+        request = mock.create_autospec(transport.Request)
+        creds = credentials.Credentials(
+            token=None,
+            refresh_token=self.REFRESH_TOKEN,
+            token_uri=self.TOKEN_URI,
+            client_id=self.CLIENT_ID,
+            client_secret=self.CLIENT_SECRET,
+            default_scopes=default_scopes,
+        )
+
+        # Refresh credentials
+        creds.refresh(request)
+
+        # Check jwt grant call.
+        refresh_grant.assert_called_with(
+            request,
+            self.TOKEN_URI,
+            self.REFRESH_TOKEN,
+            self.CLIENT_ID,
+            self.CLIENT_SECRET,
+            default_scopes,
+        )
+
+        # Check that the credentials have the token and expiry
+        assert creds.token == token
+        assert creds.expiry == expiry
+        assert creds.id_token == mock.sentinel.id_token
+        assert creds.has_scopes(default_scopes)
+
+        # Check that the credentials are valid (have a token and are not
+        # expired.)
+        assert creds.valid
+
+    @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
+    @mock.patch(
+        "google.auth._helpers.utcnow",
+        return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+    )
     def test_credentials_with_scopes_returned_refresh_success(
         self, unused_utcnow, refresh_grant
     ):
diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py
index 4c75e37..40a4ca2 100644
--- a/tests/oauth2/test_service_account.py
+++ b/tests/oauth2/test_service_account.py
@@ -203,6 +203,28 @@
         assert "x-goog-user-project" not in headers
         assert "token" in headers["authorization"]
 
+    @mock.patch("google.auth.jwt.Credentials.from_signing_credentials", autospec=True)
+    def test__create_self_signed_jwt(self, from_signing_credentials):
+        credentials = service_account.Credentials(
+            SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI
+        )
+
+        audience = "https://pubsub.googleapis.com"
+        credentials._create_self_signed_jwt(audience)
+        from_signing_credentials.assert_called_once_with(credentials, audience)
+
+    @mock.patch("google.auth.jwt.Credentials.from_signing_credentials", autospec=True)
+    def test__create_self_signed_jwt_with_user_scopes(self, from_signing_credentials):
+        credentials = service_account.Credentials(
+            SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, scopes=["foo"]
+        )
+
+        audience = "https://pubsub.googleapis.com"
+        credentials._create_self_signed_jwt(audience)
+
+        # JWT should not be created if there are user-defined scopes
+        from_signing_credentials.assert_not_called()
+
     @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
     def test_refresh_success(self, jwt_grant):
         credentials = self.make_credentials()
@@ -257,6 +279,32 @@
         # Credentials should now be valid.
         assert credentials.valid
 
+    @mock.patch("google.auth.jwt.Credentials._make_jwt")
+    def test_refresh_with_jwt_credentials(self, make_jwt):
+        credentials = self.make_credentials()
+        credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
+
+        request = mock.create_autospec(transport.Request, instance=True)
+
+        token = "token"
+        expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
+        make_jwt.return_value = (token, expiry)
+
+        # Credentials should start as invalid
+        assert not credentials.valid
+
+        # before_request should cause a refresh
+        credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
+
+        # Credentials should now be valid.
+        assert credentials.valid
+
+        # Assert make_jwt was called
+        assert make_jwt.called_once()
+
+        assert credentials.token == token
+        assert credentials.expiry == expiry
+
 
 class TestIDTokenCredentials(object):
     SERVICE_ACCOUNT_EMAIL = "[email protected]"
diff --git a/tests/test__default.py b/tests/test__default.py
index 2738e22..74511f9 100644
--- a/tests/test__default.py
+++ b/tests/test__default.py
@@ -471,7 +471,7 @@
 
     assert credentials == with_scopes.return_value
     assert project_id == mock.sentinel.project_id
-    with_scopes.assert_called_once_with(MOCK_CREDENTIALS, scopes)
+    with_scopes.assert_called_once_with(MOCK_CREDENTIALS, scopes, default_scopes=None)
 
 
 @mock.patch(
diff --git a/tests/test_app_engine.py b/tests/test_app_engine.py
index 846d314..e335ff7 100644
--- a/tests/test_app_engine.py
+++ b/tests/test_app_engine.py
@@ -101,6 +101,7 @@
         assert not credentials.expired
         # Scopes are required
         assert not credentials.scopes
+        assert not credentials.default_scopes
         assert credentials.requires_scopes
         assert not credentials.quota_project_id
 
@@ -115,6 +116,20 @@
         assert scoped_credentials.has_scopes(["email"])
         assert not scoped_credentials.requires_scopes
 
+    def test_with_default_scopes(self, app_identity):
+        credentials = app_engine.Credentials()
+
+        assert not credentials.scopes
+        assert not credentials.default_scopes
+        assert credentials.requires_scopes
+
+        scoped_credentials = credentials.with_scopes(
+            scopes=None, default_scopes=["email"]
+        )
+
+        assert scoped_credentials.has_scopes(["email"])
+        assert not scoped_credentials.requires_scopes
+
     def test_with_quota_project(self, app_identity):
         credentials = app_engine.Credentials()
 
@@ -147,7 +162,9 @@
         token = "token"
         ttl = 643942923
         app_identity.get_access_token.return_value = token, ttl
-        credentials = app_engine.Credentials(scopes=["email"])
+        credentials = app_engine.Credentials(
+            scopes=["email"], default_scopes=["profile"]
+        )
 
         credentials.refresh(None)
 
@@ -159,6 +176,23 @@
         assert credentials.valid
         assert not credentials.expired
 
+    @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+    def test_refresh_with_default_scopes(self, utcnow, app_identity):
+        token = "token"
+        ttl = 643942923
+        app_identity.get_access_token.return_value = token, ttl
+        credentials = app_engine.Credentials(default_scopes=["email"])
+
+        credentials.refresh(None)
+
+        app_identity.get_access_token.assert_called_with(
+            credentials.default_scopes, credentials._service_account_id
+        )
+        assert credentials.token == token
+        assert credentials.expiry == datetime.datetime(1990, 5, 29, 1, 2, 3)
+        assert credentials.valid
+        assert not credentials.expired
+
     def test_sign_bytes(self, app_identity):
         app_identity.sign_blob.return_value = (
             mock.sentinel.key_id,
diff --git a/tests/test_credentials.py b/tests/test_credentials.py
index 0637b01..0633b38 100644
--- a/tests/test_credentials.py
+++ b/tests/test_credentials.py
@@ -142,16 +142,19 @@
 
 
 class RequiresScopedCredentialsImpl(credentials.Scoped, CredentialsImpl):
-    def __init__(self, scopes=None):
+    def __init__(self, scopes=None, default_scopes=None):
         super(RequiresScopedCredentialsImpl, self).__init__()
         self._scopes = scopes
+        self._default_scopes = default_scopes
 
     @property
     def requires_scopes(self):
         return not self.scopes
 
-    def with_scopes(self, scopes):
-        return RequiresScopedCredentialsImpl(scopes=scopes)
+    def with_scopes(self, scopes, default_scopes=None):
+        return RequiresScopedCredentialsImpl(
+            scopes=scopes, default_scopes=default_scopes
+        )
 
 
 def test_create_scoped_if_required_scoped():
diff --git a/tests/transport/test_grpc.py b/tests/transport/test_grpc.py
index 39f8b11..1602f4c 100644
--- a/tests/transport/test_grpc.py
+++ b/tests/transport/test_grpc.py
@@ -24,6 +24,7 @@
 from google.auth import environment_vars
 from google.auth import exceptions
 from google.auth import transport
+from google.oauth2 import service_account
 
 try:
     # pylint: disable=ungrouped-imports
@@ -74,7 +75,7 @@
         time.sleep(2)
 
         callback.assert_called_once_with(
-            [(u"authorization", u"Bearer {}".format(credentials.token))], None
+            [("authorization", "Bearer {}".format(credentials.token))], None
         )
 
     def test_call_refresh(self):
@@ -95,7 +96,41 @@
 
         assert credentials.token == "token1"
         callback.assert_called_once_with(
-            [(u"authorization", u"Bearer {}".format(credentials.token))], None
+            [("authorization", "Bearer {}".format(credentials.token))], None
+        )
+
+    def test__get_authorization_headers_with_service_account(self):
+        credentials = mock.create_autospec(service_account.Credentials)
+        request = mock.create_autospec(transport.Request)
+
+        plugin = google.auth.transport.grpc.AuthMetadataPlugin(credentials, request)
+
+        context = mock.create_autospec(grpc.AuthMetadataContext, instance=True)
+        context.method_name = "methodName"
+        context.service_url = "https://pubsub.googleapis.com/methodName"
+
+        plugin._get_authorization_headers(context)
+
+        # self-signed JWT should not be created when default_host is not set
+        credentials._create_self_signed_jwt.assert_not_called()
+
+    def test__get_authorization_headers_with_service_account_and_default_host(self):
+        credentials = mock.create_autospec(service_account.Credentials)
+        request = mock.create_autospec(transport.Request)
+
+        default_host = "pubsub.googleapis.com"
+        plugin = google.auth.transport.grpc.AuthMetadataPlugin(
+            credentials, request, default_host=default_host
+        )
+
+        context = mock.create_autospec(grpc.AuthMetadataContext, instance=True)
+        context.method_name = "methodName"
+        context.service_url = "https://pubsub.googleapis.com/methodName"
+
+        plugin._get_authorization_headers(context)
+
+        credentials._create_self_signed_jwt.assert_called_once_with(
+            "https://{}/".format(default_host)
         )