| # Copyright 2016 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import datetime |
| import json |
| import os |
| import pickle |
| import sys |
| |
| import mock |
| import pytest |
| |
| from google.auth import _helpers |
| from google.auth import exceptions |
| from google.auth import transport |
| from google.oauth2 import credentials |
| |
| |
| DATA_DIR = os.path.join(os.path.dirname(__file__), "..", "data") |
| |
| AUTH_USER_JSON_FILE = os.path.join(DATA_DIR, "authorized_user.json") |
| |
| with open(AUTH_USER_JSON_FILE, "r") as fh: |
| AUTH_USER_INFO = json.load(fh) |
| |
| |
| class TestCredentials(object): |
| TOKEN_URI = "https://example.com/oauth2/token" |
| REFRESH_TOKEN = "refresh_token" |
| RAPT_TOKEN = "rapt_token" |
| CLIENT_ID = "client_id" |
| CLIENT_SECRET = "client_secret" |
| |
| @classmethod |
| def make_credentials(cls): |
| return credentials.Credentials( |
| token=None, |
| refresh_token=cls.REFRESH_TOKEN, |
| token_uri=cls.TOKEN_URI, |
| client_id=cls.CLIENT_ID, |
| client_secret=cls.CLIENT_SECRET, |
| rapt_token=cls.RAPT_TOKEN, |
| enable_reauth_refresh=True, |
| ) |
| |
| def test_default_state(self): |
| credentials = self.make_credentials() |
| assert not credentials.valid |
| # Expiration hasn't been set yet |
| assert not credentials.expired |
| # Scopes aren't required for these credentials |
| assert not credentials.requires_scopes |
| # Test properties |
| assert credentials.refresh_token == self.REFRESH_TOKEN |
| assert credentials.token_uri == self.TOKEN_URI |
| assert credentials.client_id == self.CLIENT_ID |
| assert credentials.client_secret == self.CLIENT_SECRET |
| assert credentials.rapt_token == self.RAPT_TOKEN |
| assert credentials.refresh_handler is None |
| |
| def test_refresh_handler_setter_and_getter(self): |
| scopes = ["email", "profile"] |
| original_refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN_1", None)) |
| updated_refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN_2", None)) |
| creds = credentials.Credentials( |
| token=None, |
| refresh_token=None, |
| token_uri=None, |
| client_id=None, |
| client_secret=None, |
| rapt_token=None, |
| scopes=scopes, |
| default_scopes=None, |
| refresh_handler=original_refresh_handler, |
| ) |
| |
| assert creds.refresh_handler is original_refresh_handler |
| |
| creds.refresh_handler = updated_refresh_handler |
| |
| assert creds.refresh_handler is updated_refresh_handler |
| |
| creds.refresh_handler = None |
| |
| assert creds.refresh_handler is None |
| |
| def test_invalid_refresh_handler(self): |
| scopes = ["email", "profile"] |
| with pytest.raises(TypeError) as excinfo: |
| credentials.Credentials( |
| token=None, |
| refresh_token=None, |
| token_uri=None, |
| client_id=None, |
| client_secret=None, |
| rapt_token=None, |
| scopes=scopes, |
| default_scopes=None, |
| refresh_handler=object(), |
| ) |
| |
| assert excinfo.match("The provided refresh_handler is not a callable or None.") |
| |
| @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) |
| @mock.patch( |
| "google.auth._helpers.utcnow", |
| return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, |
| ) |
| def test_refresh_success(self, unused_utcnow, refresh_grant): |
| token = "token" |
| new_rapt_token = "new_rapt_token" |
| expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) |
| grant_response = {"id_token": mock.sentinel.id_token} |
| refresh_grant.return_value = ( |
| # Access token |
| token, |
| # New refresh token |
| None, |
| # Expiry, |
| expiry, |
| # Extra data |
| grant_response, |
| # rapt_token |
| new_rapt_token, |
| ) |
| |
| request = mock.create_autospec(transport.Request) |
| credentials = self.make_credentials() |
| |
| # Refresh credentials |
| credentials.refresh(request) |
| |
| # Check jwt grant call. |
| refresh_grant.assert_called_with( |
| request, |
| self.TOKEN_URI, |
| self.REFRESH_TOKEN, |
| self.CLIENT_ID, |
| self.CLIENT_SECRET, |
| None, |
| self.RAPT_TOKEN, |
| True, |
| ) |
| |
| # Check that the credentials have the token and expiry |
| assert credentials.token == token |
| assert credentials.expiry == expiry |
| assert credentials.id_token == mock.sentinel.id_token |
| assert credentials.rapt_token == new_rapt_token |
| |
| # Check that the credentials are valid (have a token and are not |
| # expired) |
| assert credentials.valid |
| |
| def test_refresh_no_refresh_token(self): |
| request = mock.create_autospec(transport.Request) |
| credentials_ = credentials.Credentials(token=None, refresh_token=None) |
| |
| with pytest.raises(exceptions.RefreshError, match="necessary fields"): |
| credentials_.refresh(request) |
| |
| request.assert_not_called() |
| |
| @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) |
| @mock.patch( |
| "google.auth._helpers.utcnow", |
| return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, |
| ) |
| def test_refresh_with_refresh_token_and_refresh_handler( |
| self, unused_utcnow, refresh_grant |
| ): |
| token = "token" |
| new_rapt_token = "new_rapt_token" |
| expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) |
| grant_response = {"id_token": mock.sentinel.id_token} |
| refresh_grant.return_value = ( |
| # Access token |
| token, |
| # New refresh token |
| None, |
| # Expiry, |
| expiry, |
| # Extra data |
| grant_response, |
| # rapt_token |
| new_rapt_token, |
| ) |
| |
| refresh_handler = mock.Mock() |
| request = mock.create_autospec(transport.Request) |
| creds = credentials.Credentials( |
| token=None, |
| refresh_token=self.REFRESH_TOKEN, |
| token_uri=self.TOKEN_URI, |
| client_id=self.CLIENT_ID, |
| client_secret=self.CLIENT_SECRET, |
| rapt_token=self.RAPT_TOKEN, |
| refresh_handler=refresh_handler, |
| ) |
| |
| # Refresh credentials |
| creds.refresh(request) |
| |
| # Check jwt grant call. |
| refresh_grant.assert_called_with( |
| request, |
| self.TOKEN_URI, |
| self.REFRESH_TOKEN, |
| self.CLIENT_ID, |
| self.CLIENT_SECRET, |
| None, |
| self.RAPT_TOKEN, |
| False, |
| ) |
| |
| # Check that the credentials have the token and expiry |
| assert creds.token == token |
| assert creds.expiry == expiry |
| assert creds.id_token == mock.sentinel.id_token |
| assert creds.rapt_token == new_rapt_token |
| |
| # Check that the credentials are valid (have a token and are not |
| # expired) |
| assert creds.valid |
| |
| # Assert refresh handler not called as the refresh token has |
| # higher priority. |
| refresh_handler.assert_not_called() |
| |
| @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) |
| def test_refresh_with_refresh_handler_success_scopes(self, unused_utcnow): |
| expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800) |
| refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN", expected_expiry)) |
| scopes = ["email", "profile"] |
| default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] |
| request = mock.create_autospec(transport.Request) |
| creds = credentials.Credentials( |
| token=None, |
| refresh_token=None, |
| token_uri=None, |
| client_id=None, |
| client_secret=None, |
| rapt_token=None, |
| scopes=scopes, |
| default_scopes=default_scopes, |
| refresh_handler=refresh_handler, |
| ) |
| |
| creds.refresh(request) |
| |
| assert creds.token == "ACCESS_TOKEN" |
| assert creds.expiry == expected_expiry |
| assert creds.valid |
| assert not creds.expired |
| # Confirm refresh handler called with the expected arguments. |
| refresh_handler.assert_called_with(request, scopes=scopes) |
| |
| @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) |
| def test_refresh_with_refresh_handler_success_default_scopes(self, unused_utcnow): |
| expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800) |
| original_refresh_handler = mock.Mock( |
| return_value=("UNUSED_TOKEN", expected_expiry) |
| ) |
| refresh_handler = mock.Mock(return_value=("ACCESS_TOKEN", expected_expiry)) |
| default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] |
| request = mock.create_autospec(transport.Request) |
| creds = credentials.Credentials( |
| token=None, |
| refresh_token=None, |
| token_uri=None, |
| client_id=None, |
| client_secret=None, |
| rapt_token=None, |
| scopes=None, |
| default_scopes=default_scopes, |
| refresh_handler=original_refresh_handler, |
| ) |
| |
| # Test newly set refresh_handler is used instead of the original one. |
| creds.refresh_handler = refresh_handler |
| creds.refresh(request) |
| |
| assert creds.token == "ACCESS_TOKEN" |
| assert creds.expiry == expected_expiry |
| assert creds.valid |
| assert not creds.expired |
| # default_scopes should be used since no developer provided scopes |
| # are provided. |
| refresh_handler.assert_called_with(request, scopes=default_scopes) |
| |
| @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) |
| def test_refresh_with_refresh_handler_invalid_token(self, unused_utcnow): |
| expected_expiry = datetime.datetime.min + datetime.timedelta(seconds=2800) |
| # Simulate refresh handler does not return a valid token. |
| refresh_handler = mock.Mock(return_value=(None, expected_expiry)) |
| scopes = ["email", "profile"] |
| default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] |
| request = mock.create_autospec(transport.Request) |
| creds = credentials.Credentials( |
| token=None, |
| refresh_token=None, |
| token_uri=None, |
| client_id=None, |
| client_secret=None, |
| rapt_token=None, |
| scopes=scopes, |
| default_scopes=default_scopes, |
| refresh_handler=refresh_handler, |
| ) |
| |
| with pytest.raises( |
| exceptions.RefreshError, match="returned token is not a string" |
| ): |
| creds.refresh(request) |
| |
| assert creds.token is None |
| assert creds.expiry is None |
| assert not creds.valid |
| # Confirm refresh handler called with the expected arguments. |
| refresh_handler.assert_called_with(request, scopes=scopes) |
| |
| def test_refresh_with_refresh_handler_invalid_expiry(self): |
| # Simulate refresh handler returns expiration time in an invalid unit. |
| refresh_handler = mock.Mock(return_value=("TOKEN", 2800)) |
| scopes = ["email", "profile"] |
| default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] |
| request = mock.create_autospec(transport.Request) |
| creds = credentials.Credentials( |
| token=None, |
| refresh_token=None, |
| token_uri=None, |
| client_id=None, |
| client_secret=None, |
| rapt_token=None, |
| scopes=scopes, |
| default_scopes=default_scopes, |
| refresh_handler=refresh_handler, |
| ) |
| |
| with pytest.raises( |
| exceptions.RefreshError, match="returned expiry is not a datetime object" |
| ): |
| creds.refresh(request) |
| |
| assert creds.token is None |
| assert creds.expiry is None |
| assert not creds.valid |
| # Confirm refresh handler called with the expected arguments. |
| refresh_handler.assert_called_with(request, scopes=scopes) |
| |
| @mock.patch("google.auth._helpers.utcnow", return_value=datetime.datetime.min) |
| def test_refresh_with_refresh_handler_expired_token(self, unused_utcnow): |
| expected_expiry = datetime.datetime.min + _helpers.REFRESH_THRESHOLD |
| # Simulate refresh handler returns an expired token. |
| refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry)) |
| scopes = ["email", "profile"] |
| default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] |
| request = mock.create_autospec(transport.Request) |
| creds = credentials.Credentials( |
| token=None, |
| refresh_token=None, |
| token_uri=None, |
| client_id=None, |
| client_secret=None, |
| rapt_token=None, |
| scopes=scopes, |
| default_scopes=default_scopes, |
| refresh_handler=refresh_handler, |
| ) |
| |
| with pytest.raises(exceptions.RefreshError, match="already expired"): |
| creds.refresh(request) |
| |
| assert creds.token is None |
| assert creds.expiry is None |
| assert not creds.valid |
| # Confirm refresh handler called with the expected arguments. |
| refresh_handler.assert_called_with(request, scopes=scopes) |
| |
| @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) |
| @mock.patch( |
| "google.auth._helpers.utcnow", |
| return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, |
| ) |
| def test_credentials_with_scopes_requested_refresh_success( |
| self, unused_utcnow, refresh_grant |
| ): |
| scopes = ["email", "profile"] |
| default_scopes = ["https://www.googleapis.com/auth/cloud-platform"] |
| token = "token" |
| new_rapt_token = "new_rapt_token" |
| expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) |
| grant_response = {"id_token": mock.sentinel.id_token, "scope": "email profile"} |
| refresh_grant.return_value = ( |
| # Access token |
| token, |
| # New refresh token |
| None, |
| # Expiry, |
| expiry, |
| # Extra data |
| grant_response, |
| # rapt token |
| new_rapt_token, |
| ) |
| |
| request = mock.create_autospec(transport.Request) |
| creds = credentials.Credentials( |
| token=None, |
| refresh_token=self.REFRESH_TOKEN, |
| token_uri=self.TOKEN_URI, |
| client_id=self.CLIENT_ID, |
| client_secret=self.CLIENT_SECRET, |
| scopes=scopes, |
| default_scopes=default_scopes, |
| rapt_token=self.RAPT_TOKEN, |
| enable_reauth_refresh=True, |
| ) |
| |
| # Refresh credentials |
| creds.refresh(request) |
| |
| # Check jwt grant call. |
| refresh_grant.assert_called_with( |
| request, |
| self.TOKEN_URI, |
| self.REFRESH_TOKEN, |
| self.CLIENT_ID, |
| self.CLIENT_SECRET, |
| scopes, |
| self.RAPT_TOKEN, |
| True, |
| ) |
| |
| # Check that the credentials have the token and expiry |
| assert creds.token == token |
| assert creds.expiry == expiry |
| assert creds.id_token == mock.sentinel.id_token |
| assert creds.has_scopes(scopes) |
| assert creds.rapt_token == new_rapt_token |
| |
| # Check that the credentials are valid (have a token and are not |
| # expired.) |
| assert creds.valid |
| |
| @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) |
| @mock.patch( |
| "google.auth._helpers.utcnow", |
| return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, |
| ) |
| def test_credentials_with_only_default_scopes_requested( |
| self, unused_utcnow, refresh_grant |
| ): |
| default_scopes = ["email", "profile"] |
| token = "token" |
| new_rapt_token = "new_rapt_token" |
| expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) |
| grant_response = {"id_token": mock.sentinel.id_token} |
| refresh_grant.return_value = ( |
| # Access token |
| token, |
| # New refresh token |
| None, |
| # Expiry, |
| expiry, |
| # Extra data |
| grant_response, |
| # rapt token |
| new_rapt_token, |
| ) |
| |
| request = mock.create_autospec(transport.Request) |
| creds = credentials.Credentials( |
| token=None, |
| refresh_token=self.REFRESH_TOKEN, |
| token_uri=self.TOKEN_URI, |
| client_id=self.CLIENT_ID, |
| client_secret=self.CLIENT_SECRET, |
| default_scopes=default_scopes, |
| rapt_token=self.RAPT_TOKEN, |
| enable_reauth_refresh=True, |
| ) |
| |
| # Refresh credentials |
| creds.refresh(request) |
| |
| # Check jwt grant call. |
| refresh_grant.assert_called_with( |
| request, |
| self.TOKEN_URI, |
| self.REFRESH_TOKEN, |
| self.CLIENT_ID, |
| self.CLIENT_SECRET, |
| default_scopes, |
| self.RAPT_TOKEN, |
| True, |
| ) |
| |
| # Check that the credentials have the token and expiry |
| assert creds.token == token |
| assert creds.expiry == expiry |
| assert creds.id_token == mock.sentinel.id_token |
| assert creds.has_scopes(default_scopes) |
| assert creds.rapt_token == new_rapt_token |
| |
| # Check that the credentials are valid (have a token and are not |
| # expired.) |
| assert creds.valid |
| |
| @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) |
| @mock.patch( |
| "google.auth._helpers.utcnow", |
| return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, |
| ) |
| def test_credentials_with_scopes_returned_refresh_success( |
| self, unused_utcnow, refresh_grant |
| ): |
| scopes = ["email", "profile"] |
| token = "token" |
| new_rapt_token = "new_rapt_token" |
| expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) |
| grant_response = { |
| "id_token": mock.sentinel.id_token, |
| "scopes": " ".join(scopes), |
| } |
| refresh_grant.return_value = ( |
| # Access token |
| token, |
| # New refresh token |
| None, |
| # Expiry, |
| expiry, |
| # Extra data |
| grant_response, |
| # rapt token |
| new_rapt_token, |
| ) |
| |
| request = mock.create_autospec(transport.Request) |
| creds = credentials.Credentials( |
| token=None, |
| refresh_token=self.REFRESH_TOKEN, |
| token_uri=self.TOKEN_URI, |
| client_id=self.CLIENT_ID, |
| client_secret=self.CLIENT_SECRET, |
| scopes=scopes, |
| rapt_token=self.RAPT_TOKEN, |
| enable_reauth_refresh=True, |
| ) |
| |
| # Refresh credentials |
| creds.refresh(request) |
| |
| # Check jwt grant call. |
| refresh_grant.assert_called_with( |
| request, |
| self.TOKEN_URI, |
| self.REFRESH_TOKEN, |
| self.CLIENT_ID, |
| self.CLIENT_SECRET, |
| scopes, |
| self.RAPT_TOKEN, |
| True, |
| ) |
| |
| # Check that the credentials have the token and expiry |
| assert creds.token == token |
| assert creds.expiry == expiry |
| assert creds.id_token == mock.sentinel.id_token |
| assert creds.has_scopes(scopes) |
| assert creds.rapt_token == new_rapt_token |
| |
| # Check that the credentials are valid (have a token and are not |
| # expired.) |
| assert creds.valid |
| |
| @mock.patch("google.oauth2.reauth.refresh_grant", autospec=True) |
| @mock.patch( |
| "google.auth._helpers.utcnow", |
| return_value=datetime.datetime.min + _helpers.REFRESH_THRESHOLD, |
| ) |
| def test_credentials_with_scopes_refresh_failure_raises_refresh_error( |
| self, unused_utcnow, refresh_grant |
| ): |
| scopes = ["email", "profile"] |
| scopes_returned = ["email"] |
| token = "token" |
| new_rapt_token = "new_rapt_token" |
| expiry = _helpers.utcnow() + datetime.timedelta(seconds=500) |
| grant_response = { |
| "id_token": mock.sentinel.id_token, |
| "scope": " ".join(scopes_returned), |
| } |
| refresh_grant.return_value = ( |
| # Access token |
| token, |
| # New refresh token |
| None, |
| # Expiry, |
| expiry, |
| # Extra data |
| grant_response, |
| # rapt token |
| new_rapt_token, |
| ) |
| |
| request = mock.create_autospec(transport.Request) |
| creds = credentials.Credentials( |
| token=None, |
| refresh_token=self.REFRESH_TOKEN, |
| token_uri=self.TOKEN_URI, |
| client_id=self.CLIENT_ID, |
| client_secret=self.CLIENT_SECRET, |
| scopes=scopes, |
| rapt_token=self.RAPT_TOKEN, |
| enable_reauth_refresh=True, |
| ) |
| |
| # Refresh credentials |
| with pytest.raises( |
| exceptions.RefreshError, match="Not all requested scopes were granted" |
| ): |
| creds.refresh(request) |
| |
| # Check jwt grant call. |
| refresh_grant.assert_called_with( |
| request, |
| self.TOKEN_URI, |
| self.REFRESH_TOKEN, |
| self.CLIENT_ID, |
| self.CLIENT_SECRET, |
| scopes, |
| self.RAPT_TOKEN, |
| True, |
| ) |
| |
| # Check that the credentials have the token and expiry |
| assert creds.token == token |
| assert creds.expiry == expiry |
| assert creds.id_token == mock.sentinel.id_token |
| assert creds.has_scopes(scopes) |
| assert creds.rapt_token == new_rapt_token |
| |
| # Check that the credentials are valid (have a token and are not |
| # expired.) |
| assert creds.valid |
| |
| def test_apply_with_quota_project_id(self): |
| creds = credentials.Credentials( |
| token="token", |
| refresh_token=self.REFRESH_TOKEN, |
| token_uri=self.TOKEN_URI, |
| client_id=self.CLIENT_ID, |
| client_secret=self.CLIENT_SECRET, |
| quota_project_id="quota-project-123", |
| ) |
| |
| headers = {} |
| creds.apply(headers) |
| assert headers["x-goog-user-project"] == "quota-project-123" |
| assert "token" in headers["authorization"] |
| |
| def test_apply_with_no_quota_project_id(self): |
| creds = credentials.Credentials( |
| token="token", |
| refresh_token=self.REFRESH_TOKEN, |
| token_uri=self.TOKEN_URI, |
| client_id=self.CLIENT_ID, |
| client_secret=self.CLIENT_SECRET, |
| ) |
| |
| headers = {} |
| creds.apply(headers) |
| assert "x-goog-user-project" not in headers |
| assert "token" in headers["authorization"] |
| |
| def test_with_quota_project(self): |
| creds = credentials.Credentials( |
| token="token", |
| refresh_token=self.REFRESH_TOKEN, |
| token_uri=self.TOKEN_URI, |
| client_id=self.CLIENT_ID, |
| client_secret=self.CLIENT_SECRET, |
| quota_project_id="quota-project-123", |
| ) |
| |
| new_creds = creds.with_quota_project("new-project-456") |
| assert new_creds.quota_project_id == "new-project-456" |
| headers = {} |
| creds.apply(headers) |
| assert "x-goog-user-project" in headers |
| |
| def test_from_authorized_user_info(self): |
| info = AUTH_USER_INFO.copy() |
| |
| creds = credentials.Credentials.from_authorized_user_info(info) |
| assert creds.client_secret == info["client_secret"] |
| assert creds.client_id == info["client_id"] |
| assert creds.refresh_token == info["refresh_token"] |
| assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT |
| assert creds.scopes is None |
| |
| scopes = ["email", "profile"] |
| creds = credentials.Credentials.from_authorized_user_info(info, scopes) |
| assert creds.client_secret == info["client_secret"] |
| assert creds.client_id == info["client_id"] |
| assert creds.refresh_token == info["refresh_token"] |
| assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT |
| assert creds.scopes == scopes |
| |
| info["scopes"] = "email" # single non-array scope from file |
| creds = credentials.Credentials.from_authorized_user_info(info) |
| assert creds.scopes == [info["scopes"]] |
| |
| info["scopes"] = ["email", "profile"] # array scope from file |
| creds = credentials.Credentials.from_authorized_user_info(info) |
| assert creds.scopes == info["scopes"] |
| |
| expiry = datetime.datetime(2020, 8, 14, 15, 54, 1) |
| info["expiry"] = expiry.isoformat() + "Z" |
| creds = credentials.Credentials.from_authorized_user_info(info) |
| assert creds.expiry == expiry |
| assert creds.expired |
| |
| def test_from_authorized_user_file(self): |
| info = AUTH_USER_INFO.copy() |
| |
| creds = credentials.Credentials.from_authorized_user_file(AUTH_USER_JSON_FILE) |
| assert creds.client_secret == info["client_secret"] |
| assert creds.client_id == info["client_id"] |
| assert creds.refresh_token == info["refresh_token"] |
| assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT |
| assert creds.scopes is None |
| assert creds.rapt_token is None |
| |
| scopes = ["email", "profile"] |
| creds = credentials.Credentials.from_authorized_user_file( |
| AUTH_USER_JSON_FILE, scopes |
| ) |
| assert creds.client_secret == info["client_secret"] |
| assert creds.client_id == info["client_id"] |
| assert creds.refresh_token == info["refresh_token"] |
| assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT |
| assert creds.scopes == scopes |
| |
| def test_from_authorized_user_file_with_rapt_token(self): |
| info = AUTH_USER_INFO.copy() |
| file_path = os.path.join(DATA_DIR, "authorized_user_with_rapt_token.json") |
| |
| creds = credentials.Credentials.from_authorized_user_file(file_path) |
| assert creds.client_secret == info["client_secret"] |
| assert creds.client_id == info["client_id"] |
| assert creds.refresh_token == info["refresh_token"] |
| assert creds.token_uri == credentials._GOOGLE_OAUTH2_TOKEN_ENDPOINT |
| assert creds.scopes is None |
| assert creds.rapt_token == "rapt" |
| |
| def test_to_json(self): |
| info = AUTH_USER_INFO.copy() |
| expiry = datetime.datetime(2020, 8, 14, 15, 54, 1) |
| info["expiry"] = expiry.isoformat() + "Z" |
| creds = credentials.Credentials.from_authorized_user_info(info) |
| assert creds.expiry == expiry |
| |
| # Test with no `strip` arg |
| json_output = creds.to_json() |
| json_asdict = json.loads(json_output) |
| assert json_asdict.get("token") == creds.token |
| assert json_asdict.get("refresh_token") == creds.refresh_token |
| assert json_asdict.get("token_uri") == creds.token_uri |
| assert json_asdict.get("client_id") == creds.client_id |
| assert json_asdict.get("scopes") == creds.scopes |
| assert json_asdict.get("client_secret") == creds.client_secret |
| assert json_asdict.get("expiry") == info["expiry"] |
| |
| # Test with a `strip` arg |
| json_output = creds.to_json(strip=["client_secret"]) |
| json_asdict = json.loads(json_output) |
| assert json_asdict.get("token") == creds.token |
| assert json_asdict.get("refresh_token") == creds.refresh_token |
| assert json_asdict.get("token_uri") == creds.token_uri |
| assert json_asdict.get("client_id") == creds.client_id |
| assert json_asdict.get("scopes") == creds.scopes |
| assert json_asdict.get("client_secret") is None |
| |
| # Test with no expiry |
| creds.expiry = None |
| json_output = creds.to_json() |
| json_asdict = json.loads(json_output) |
| assert json_asdict.get("expiry") is None |
| |
| def test_pickle_and_unpickle(self): |
| creds = self.make_credentials() |
| unpickled = pickle.loads(pickle.dumps(creds)) |
| |
| # make sure attributes aren't lost during pickling |
| assert list(creds.__dict__).sort() == list(unpickled.__dict__).sort() |
| |
| for attr in list(creds.__dict__): |
| assert getattr(creds, attr) == getattr(unpickled, attr) |
| |
| def test_pickle_and_unpickle_with_refresh_handler(self): |
| expected_expiry = _helpers.utcnow() + datetime.timedelta(seconds=2800) |
| refresh_handler = mock.Mock(return_value=("TOKEN", expected_expiry)) |
| |
| creds = credentials.Credentials( |
| token=None, |
| refresh_token=None, |
| token_uri=None, |
| client_id=None, |
| client_secret=None, |
| rapt_token=None, |
| refresh_handler=refresh_handler, |
| ) |
| unpickled = pickle.loads(pickle.dumps(creds)) |
| |
| # make sure attributes aren't lost during pickling |
| assert list(creds.__dict__).sort() == list(unpickled.__dict__).sort() |
| |
| for attr in list(creds.__dict__): |
| # For the _refresh_handler property, the unpickled creds should be |
| # set to None. |
| if attr == "_refresh_handler": |
| assert getattr(unpickled, attr) is None |
| else: |
| assert getattr(creds, attr) == getattr(unpickled, attr) |
| |
| def test_pickle_with_missing_attribute(self): |
| creds = self.make_credentials() |
| |
| # remove an optional attribute before pickling |
| # this mimics a pickle created with a previous class definition with |
| # fewer attributes |
| del creds.__dict__["_quota_project_id"] |
| |
| unpickled = pickle.loads(pickle.dumps(creds)) |
| |
| # Attribute should be initialized by `__setstate__` |
| assert unpickled.quota_project_id is None |
| |
| # pickles are not compatible across versions |
| @pytest.mark.skipif( |
| sys.version_info < (3, 5), |
| reason="pickle file can only be loaded with Python >= 3.5", |
| ) |
| def test_unpickle_old_credentials_pickle(self): |
| # make sure a credentials file pickled with an older |
| # library version (google-auth==1.5.1) can be unpickled |
| with open( |
| os.path.join(DATA_DIR, "old_oauth_credentials_py3.pickle"), "rb" |
| ) as f: |
| credentials = pickle.load(f) |
| assert credentials.quota_project_id is None |
| |
| |
| class TestUserAccessTokenCredentials(object): |
| def test_instance(self): |
| cred = credentials.UserAccessTokenCredentials() |
| assert cred._account is None |
| |
| cred = cred.with_account("account") |
| assert cred._account == "account" |
| |
| @mock.patch("google.auth._cloud_sdk.get_auth_access_token", autospec=True) |
| def test_refresh(self, get_auth_access_token): |
| get_auth_access_token.return_value = "access_token" |
| cred = credentials.UserAccessTokenCredentials() |
| cred.refresh(None) |
| assert cred.token == "access_token" |
| |
| def test_with_quota_project(self): |
| cred = credentials.UserAccessTokenCredentials() |
| quota_project_cred = cred.with_quota_project("project-foo") |
| |
| assert quota_project_cred._quota_project_id == "project-foo" |
| assert quota_project_cred._account == cred._account |
| |
| @mock.patch( |
| "google.oauth2.credentials.UserAccessTokenCredentials.apply", autospec=True |
| ) |
| @mock.patch( |
| "google.oauth2.credentials.UserAccessTokenCredentials.refresh", autospec=True |
| ) |
| def test_before_request(self, refresh, apply): |
| cred = credentials.UserAccessTokenCredentials() |
| cred.before_request(mock.Mock(), "GET", "https://example.com", {}) |
| refresh.assert_called() |
| apply.assert_called() |