feat: use self-signed jwt for service account (#665)

diff --git a/tests/compute_engine/test__metadata.py b/tests/compute_engine/test__metadata.py
index d053372..852822d 100644
--- a/tests/compute_engine/test__metadata.py
+++ b/tests/compute_engine/test__metadata.py
@@ -318,6 +318,44 @@
     assert expiry == utcnow() + datetime.timedelta(seconds=ttl)
 
 
[email protected]("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test_get_service_account_token_with_scopes_list(utcnow):
+    ttl = 500
+    request = make_request(
+        json.dumps({"access_token": "token", "expires_in": ttl}),
+        headers={"content-type": "application/json"},
+    )
+
+    token, expiry = _metadata.get_service_account_token(request, scopes=["foo", "bar"])
+
+    request.assert_called_once_with(
+        method="GET",
+        url=_metadata._METADATA_ROOT + PATH + "/token" + "?scopes=foo%2Cbar",
+        headers=_metadata._METADATA_HEADERS,
+    )
+    assert token == "token"
+    assert expiry == utcnow() + datetime.timedelta(seconds=ttl)
+
+
[email protected]("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+def test_get_service_account_token_with_scopes_string(utcnow):
+    ttl = 500
+    request = make_request(
+        json.dumps({"access_token": "token", "expires_in": ttl}),
+        headers={"content-type": "application/json"},
+    )
+
+    token, expiry = _metadata.get_service_account_token(request, scopes="foo,bar")
+
+    request.assert_called_once_with(
+        method="GET",
+        url=_metadata._METADATA_ROOT + PATH + "/token" + "?scopes=foo%2Cbar",
+        headers=_metadata._METADATA_HEADERS,
+    )
+    assert token == "token"
+    assert expiry == utcnow() + datetime.timedelta(seconds=ttl)
+
+
 def test_get_service_account_info():
     key, value = "foo", "bar"
     request = make_request(
diff --git a/tests/oauth2/test_credentials.py b/tests/oauth2/test_credentials.py
index ee8b8a2..b885d29 100644
--- a/tests/oauth2/test_credentials.py
+++ b/tests/oauth2/test_credentials.py
@@ -127,6 +127,7 @@
         self, unused_utcnow, refresh_grant
     ):
         scopes = ["email", "profile"]
+        default_scopes = ["https://www.googleapis.com/auth/cloud-platform"]
         token = "token"
         expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
         grant_response = {"id_token": mock.sentinel.id_token}
@@ -149,6 +150,7 @@
             client_id=self.CLIENT_ID,
             client_secret=self.CLIENT_SECRET,
             scopes=scopes,
+            default_scopes=default_scopes,
         )
 
         # Refresh credentials
@@ -179,6 +181,62 @@
         "google.auth._helpers.utcnow",
         return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
     )
+    def test_credentials_with_only_default_scopes_requested(
+        self, unused_utcnow, refresh_grant
+    ):
+        default_scopes = ["email", "profile"]
+        token = "token"
+        expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
+        grant_response = {"id_token": mock.sentinel.id_token}
+        refresh_grant.return_value = (
+            # Access token
+            token,
+            # New refresh token
+            None,
+            # Expiry,
+            expiry,
+            # Extra data
+            grant_response,
+        )
+
+        request = mock.create_autospec(transport.Request)
+        creds = credentials.Credentials(
+            token=None,
+            refresh_token=self.REFRESH_TOKEN,
+            token_uri=self.TOKEN_URI,
+            client_id=self.CLIENT_ID,
+            client_secret=self.CLIENT_SECRET,
+            default_scopes=default_scopes,
+        )
+
+        # Refresh credentials
+        creds.refresh(request)
+
+        # Check jwt grant call.
+        refresh_grant.assert_called_with(
+            request,
+            self.TOKEN_URI,
+            self.REFRESH_TOKEN,
+            self.CLIENT_ID,
+            self.CLIENT_SECRET,
+            default_scopes,
+        )
+
+        # Check that the credentials have the token and expiry
+        assert creds.token == token
+        assert creds.expiry == expiry
+        assert creds.id_token == mock.sentinel.id_token
+        assert creds.has_scopes(default_scopes)
+
+        # Check that the credentials are valid (have a token and are not
+        # expired.)
+        assert creds.valid
+
+    @mock.patch("google.oauth2._client.refresh_grant", autospec=True)
+    @mock.patch(
+        "google.auth._helpers.utcnow",
+        return_value=datetime.datetime.min + _helpers.CLOCK_SKEW,
+    )
     def test_credentials_with_scopes_returned_refresh_success(
         self, unused_utcnow, refresh_grant
     ):
diff --git a/tests/oauth2/test_service_account.py b/tests/oauth2/test_service_account.py
index 4c75e37..40a4ca2 100644
--- a/tests/oauth2/test_service_account.py
+++ b/tests/oauth2/test_service_account.py
@@ -203,6 +203,28 @@
         assert "x-goog-user-project" not in headers
         assert "token" in headers["authorization"]
 
+    @mock.patch("google.auth.jwt.Credentials.from_signing_credentials", autospec=True)
+    def test__create_self_signed_jwt(self, from_signing_credentials):
+        credentials = service_account.Credentials(
+            SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI
+        )
+
+        audience = "https://pubsub.googleapis.com"
+        credentials._create_self_signed_jwt(audience)
+        from_signing_credentials.assert_called_once_with(credentials, audience)
+
+    @mock.patch("google.auth.jwt.Credentials.from_signing_credentials", autospec=True)
+    def test__create_self_signed_jwt_with_user_scopes(self, from_signing_credentials):
+        credentials = service_account.Credentials(
+            SIGNER, self.SERVICE_ACCOUNT_EMAIL, self.TOKEN_URI, scopes=["foo"]
+        )
+
+        audience = "https://pubsub.googleapis.com"
+        credentials._create_self_signed_jwt(audience)
+
+        # JWT should not be created if there are user-defined scopes
+        from_signing_credentials.assert_not_called()
+
     @mock.patch("google.oauth2._client.jwt_grant", autospec=True)
     def test_refresh_success(self, jwt_grant):
         credentials = self.make_credentials()
@@ -257,6 +279,32 @@
         # Credentials should now be valid.
         assert credentials.valid
 
+    @mock.patch("google.auth.jwt.Credentials._make_jwt")
+    def test_refresh_with_jwt_credentials(self, make_jwt):
+        credentials = self.make_credentials()
+        credentials._create_self_signed_jwt("https://pubsub.googleapis.com")
+
+        request = mock.create_autospec(transport.Request, instance=True)
+
+        token = "token"
+        expiry = _helpers.utcnow() + datetime.timedelta(seconds=500)
+        make_jwt.return_value = (token, expiry)
+
+        # Credentials should start as invalid
+        assert not credentials.valid
+
+        # before_request should cause a refresh
+        credentials.before_request(request, "GET", "http://example.com?a=1#3", {})
+
+        # Credentials should now be valid.
+        assert credentials.valid
+
+        # Assert make_jwt was called
+        assert make_jwt.called_once()
+
+        assert credentials.token == token
+        assert credentials.expiry == expiry
+
 
 class TestIDTokenCredentials(object):
     SERVICE_ACCOUNT_EMAIL = "[email protected]"
diff --git a/tests/test__default.py b/tests/test__default.py
index 2738e22..74511f9 100644
--- a/tests/test__default.py
+++ b/tests/test__default.py
@@ -471,7 +471,7 @@
 
     assert credentials == with_scopes.return_value
     assert project_id == mock.sentinel.project_id
-    with_scopes.assert_called_once_with(MOCK_CREDENTIALS, scopes)
+    with_scopes.assert_called_once_with(MOCK_CREDENTIALS, scopes, default_scopes=None)
 
 
 @mock.patch(
diff --git a/tests/test_app_engine.py b/tests/test_app_engine.py
index 846d314..e335ff7 100644
--- a/tests/test_app_engine.py
+++ b/tests/test_app_engine.py
@@ -101,6 +101,7 @@
         assert not credentials.expired
         # Scopes are required
         assert not credentials.scopes
+        assert not credentials.default_scopes
         assert credentials.requires_scopes
         assert not credentials.quota_project_id
 
@@ -115,6 +116,20 @@
         assert scoped_credentials.has_scopes(["email"])
         assert not scoped_credentials.requires_scopes
 
+    def test_with_default_scopes(self, app_identity):
+        credentials = app_engine.Credentials()
+
+        assert not credentials.scopes
+        assert not credentials.default_scopes
+        assert credentials.requires_scopes
+
+        scoped_credentials = credentials.with_scopes(
+            scopes=None, default_scopes=["email"]
+        )
+
+        assert scoped_credentials.has_scopes(["email"])
+        assert not scoped_credentials.requires_scopes
+
     def test_with_quota_project(self, app_identity):
         credentials = app_engine.Credentials()
 
@@ -147,7 +162,9 @@
         token = "token"
         ttl = 643942923
         app_identity.get_access_token.return_value = token, ttl
-        credentials = app_engine.Credentials(scopes=["email"])
+        credentials = app_engine.Credentials(
+            scopes=["email"], default_scopes=["profile"]
+        )
 
         credentials.refresh(None)
 
@@ -159,6 +176,23 @@
         assert credentials.valid
         assert not credentials.expired
 
+    @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min)
+    def test_refresh_with_default_scopes(self, utcnow, app_identity):
+        token = "token"
+        ttl = 643942923
+        app_identity.get_access_token.return_value = token, ttl
+        credentials = app_engine.Credentials(default_scopes=["email"])
+
+        credentials.refresh(None)
+
+        app_identity.get_access_token.assert_called_with(
+            credentials.default_scopes, credentials._service_account_id
+        )
+        assert credentials.token == token
+        assert credentials.expiry == datetime.datetime(1990, 5, 29, 1, 2, 3)
+        assert credentials.valid
+        assert not credentials.expired
+
     def test_sign_bytes(self, app_identity):
         app_identity.sign_blob.return_value = (
             mock.sentinel.key_id,
diff --git a/tests/test_credentials.py b/tests/test_credentials.py
index 0637b01..0633b38 100644
--- a/tests/test_credentials.py
+++ b/tests/test_credentials.py
@@ -142,16 +142,19 @@
 
 
 class RequiresScopedCredentialsImpl(credentials.Scoped, CredentialsImpl):
-    def __init__(self, scopes=None):
+    def __init__(self, scopes=None, default_scopes=None):
         super(RequiresScopedCredentialsImpl, self).__init__()
         self._scopes = scopes
+        self._default_scopes = default_scopes
 
     @property
     def requires_scopes(self):
         return not self.scopes
 
-    def with_scopes(self, scopes):
-        return RequiresScopedCredentialsImpl(scopes=scopes)
+    def with_scopes(self, scopes, default_scopes=None):
+        return RequiresScopedCredentialsImpl(
+            scopes=scopes, default_scopes=default_scopes
+        )
 
 
 def test_create_scoped_if_required_scoped():
diff --git a/tests/transport/test_grpc.py b/tests/transport/test_grpc.py
index 39f8b11..1602f4c 100644
--- a/tests/transport/test_grpc.py
+++ b/tests/transport/test_grpc.py
@@ -24,6 +24,7 @@
 from google.auth import environment_vars
 from google.auth import exceptions
 from google.auth import transport
+from google.oauth2 import service_account
 
 try:
     # pylint: disable=ungrouped-imports
@@ -74,7 +75,7 @@
         time.sleep(2)
 
         callback.assert_called_once_with(
-            [(u"authorization", u"Bearer {}".format(credentials.token))], None
+            [("authorization", "Bearer {}".format(credentials.token))], None
         )
 
     def test_call_refresh(self):
@@ -95,7 +96,41 @@
 
         assert credentials.token == "token1"
         callback.assert_called_once_with(
-            [(u"authorization", u"Bearer {}".format(credentials.token))], None
+            [("authorization", "Bearer {}".format(credentials.token))], None
+        )
+
+    def test__get_authorization_headers_with_service_account(self):
+        credentials = mock.create_autospec(service_account.Credentials)
+        request = mock.create_autospec(transport.Request)
+
+        plugin = google.auth.transport.grpc.AuthMetadataPlugin(credentials, request)
+
+        context = mock.create_autospec(grpc.AuthMetadataContext, instance=True)
+        context.method_name = "methodName"
+        context.service_url = "https://pubsub.googleapis.com/methodName"
+
+        plugin._get_authorization_headers(context)
+
+        # self-signed JWT should not be created when default_host is not set
+        credentials._create_self_signed_jwt.assert_not_called()
+
+    def test__get_authorization_headers_with_service_account_and_default_host(self):
+        credentials = mock.create_autospec(service_account.Credentials)
+        request = mock.create_autospec(transport.Request)
+
+        default_host = "pubsub.googleapis.com"
+        plugin = google.auth.transport.grpc.AuthMetadataPlugin(
+            credentials, request, default_host=default_host
+        )
+
+        context = mock.create_autospec(grpc.AuthMetadataContext, instance=True)
+        context.method_name = "methodName"
+        context.service_url = "https://pubsub.googleapis.com/methodName"
+
+        plugin._get_authorization_headers(context)
+
+        credentials._create_self_signed_jwt.assert_called_once_with(
+            "https://{}/".format(default_host)
         )