| # Copyright 2016 Google Inc. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import datetime |
| |
| import mock |
| import pytest |
| |
| from google.auth import _helpers |
| from google.auth import app_engine |
| |
| |
| class _AppIdentityModule(object): |
| """The interface of the App Idenity app engine module. |
| See https://cloud.google.com/appengine/docs/standard/python/refdocs |
| /google.appengine.api.app_identity.app_identity |
| """ |
| def get_application_id(self): |
| raise NotImplementedError() |
| |
| def sign_blob(self, bytes_to_sign, deadline=None): |
| raise NotImplementedError() |
| |
| def get_service_account_name(self, deadline=None): |
| raise NotImplementedError() |
| |
| def get_access_token(self, scopes, service_account_id=None): |
| raise NotImplementedError() |
| |
| |
| @pytest.fixture |
| def app_identity(monkeypatch): |
| """Mocks the app_identity module for google.auth.app_engine.""" |
| app_identity_module = mock.create_autospec( |
| _AppIdentityModule, instance=True) |
| monkeypatch.setattr( |
| app_engine, 'app_identity', app_identity_module) |
| yield app_identity_module |
| |
| |
| def test_get_project_id(app_identity): |
| app_identity.get_application_id.return_value = mock.sentinel.project |
| assert app_engine.get_project_id() == mock.sentinel.project |
| |
| |
| def test_get_project_id_missing_apis(): |
| with pytest.raises(EnvironmentError) as excinfo: |
| assert app_engine.get_project_id() |
| |
| assert excinfo.match(r'App Engine APIs are not available') |
| |
| |
| class TestSigner(object): |
| def test_key_id(self, app_identity): |
| app_identity.sign_blob.return_value = ( |
| mock.sentinel.key_id, mock.sentinel.signature) |
| |
| signer = app_engine.Signer() |
| |
| assert signer.key_id is None |
| |
| def test_sign(self, app_identity): |
| app_identity.sign_blob.return_value = ( |
| mock.sentinel.key_id, mock.sentinel.signature) |
| |
| signer = app_engine.Signer() |
| to_sign = b'123' |
| |
| signature = signer.sign(to_sign) |
| |
| assert signature == mock.sentinel.signature |
| app_identity.sign_blob.assert_called_with(to_sign) |
| |
| |
| class TestCredentials(object): |
| def test_missing_apis(self): |
| with pytest.raises(EnvironmentError) as excinfo: |
| app_engine.Credentials() |
| |
| assert excinfo.match(r'App Engine APIs are not available') |
| |
| def test_default_state(self, app_identity): |
| credentials = app_engine.Credentials() |
| |
| # Not token acquired yet |
| assert not credentials.valid |
| # Expiration hasn't been set yet |
| assert not credentials.expired |
| # Scopes are required |
| assert not credentials.scopes |
| assert credentials.requires_scopes |
| |
| def test_with_scopes(self, app_identity): |
| credentials = app_engine.Credentials() |
| |
| assert not credentials.scopes |
| assert credentials.requires_scopes |
| |
| scoped_credentials = credentials.with_scopes(['email']) |
| |
| assert scoped_credentials.has_scopes(['email']) |
| assert not scoped_credentials.requires_scopes |
| |
| def test_service_account_email_implicit(self, app_identity): |
| app_identity.get_service_account_name.return_value = ( |
| mock.sentinel.service_account_email) |
| credentials = app_engine.Credentials() |
| |
| assert (credentials.service_account_email == |
| mock.sentinel.service_account_email) |
| assert app_identity.get_service_account_name.called |
| |
| def test_service_account_email_explicit(self, app_identity): |
| credentials = app_engine.Credentials( |
| service_account_id=mock.sentinel.service_account_email) |
| |
| assert (credentials.service_account_email == |
| mock.sentinel.service_account_email) |
| assert not app_identity.get_service_account_name.called |
| |
| @mock.patch( |
| 'google.auth._helpers.utcnow', |
| return_value=datetime.datetime.min) |
| def test_refresh(self, utcnow, app_identity): |
| token = 'token' |
| ttl = _helpers.CLOCK_SKEW_SECS + 100 |
| app_identity.get_access_token.return_value = token, ttl |
| credentials = app_engine.Credentials(scopes=['email']) |
| |
| credentials.refresh(None) |
| |
| app_identity.get_access_token.assert_called_with( |
| credentials.scopes, credentials._service_account_id) |
| assert credentials.token == token |
| assert credentials.expiry == ( |
| utcnow() + datetime.timedelta(seconds=ttl)) |
| assert credentials.valid |
| assert not credentials.expired |
| |
| def test_sign_bytes(self, app_identity): |
| app_identity.sign_blob.return_value = ( |
| mock.sentinel.key_id, mock.sentinel.signature) |
| credentials = app_engine.Credentials() |
| to_sign = b'123' |
| |
| signature = credentials.sign_bytes(to_sign) |
| |
| assert signature == mock.sentinel.signature |
| app_identity.sign_blob.assert_called_with(to_sign) |
| |
| def test_signer(self, app_identity): |
| credentials = app_engine.Credentials() |
| assert isinstance(credentials.signer, app_engine.Signer) |
| |
| def test_signer_email(self, app_identity): |
| credentials = app_engine.Credentials() |
| assert credentials.signer_email == credentials.service_account_email |