Implement application default credentials (#32)
diff --git a/tests/data/authorized_user.json b/tests/data/authorized_user.json new file mode 100644 index 0000000..4787ace --- /dev/null +++ b/tests/data/authorized_user.json
@@ -0,0 +1,6 @@ +{ + "client_id": "123", + "client_secret": "secret", + "refresh_token": "alabalaportocala", + "type": "authorized_user" +}
diff --git a/tests/data/cloud_sdk.cfg b/tests/data/cloud_sdk.cfg new file mode 100644 index 0000000..089aac5 --- /dev/null +++ b/tests/data/cloud_sdk.cfg
@@ -0,0 +1,2 @@ +[core] +project = example-project
diff --git a/tests/test__cloud_sdk.py b/tests/test__cloud_sdk.py new file mode 100644 index 0000000..35ee426 --- /dev/null +++ b/tests/test__cloud_sdk.py
@@ -0,0 +1,146 @@ +# Copyright 2016 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +import mock +import pytest + +from google.auth import _cloud_sdk +from google.auth import environment_vars +import google.oauth2.credentials + + +DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') +AUTHORIZED_USER_FILE = os.path.join(DATA_DIR, 'authorized_user.json') + +with open(AUTHORIZED_USER_FILE) as fh: + AUTHORIZED_USER_FILE_DATA = json.load(fh) + +SERVICE_ACCOUNT_FILE = os.path.join(DATA_DIR, 'service_account.json') + +with open(SERVICE_ACCOUNT_FILE) as fh: + SERVICE_ACCOUNT_FILE_DATA = json.load(fh) + +with open(os.path.join(DATA_DIR, 'cloud_sdk.cfg')) as fh: + CLOUD_SDK_CONFIG_DATA = fh.read() + +CONFIG_PATH_PATCH = mock.patch('google.auth._cloud_sdk.get_config_path') + + [email protected] +def config_file(tmpdir): + config_dir = tmpdir.join( + '.config', _cloud_sdk._CONFIG_DIRECTORY) + config_file = config_dir.join( + _cloud_sdk._ACTIVE_CONFIG_FILENAME) + + with CONFIG_PATH_PATCH as mock_get_config_dir: + mock_get_config_dir.return_value = str(config_dir) + yield config_file + + +def test_get_project_id(config_file): + config_file.write(CLOUD_SDK_CONFIG_DATA, ensure=True) + project_id = _cloud_sdk.get_project_id() + assert project_id == 'example-project' + + +def test_get_project_id_non_existent(config_file): + project_id = _cloud_sdk.get_project_id() + assert project_id is None + + +def test_get_project_id_bad_file(config_file): + config_file.write('<<<badconfig', ensure=True) + project_id = _cloud_sdk.get_project_id() + assert project_id is None + + +def test_get_project_id_no_section(config_file): + config_file.write('[section]', ensure=True) + project_id = _cloud_sdk.get_project_id() + assert project_id is None + + +@CONFIG_PATH_PATCH +def test_get_application_default_credentials_path(mock_get_config_dir): + config_path = 'config_path' + mock_get_config_dir.return_value = config_path + credentials_path = _cloud_sdk.get_application_default_credentials_path() + assert credentials_path == os.path.join( + config_path, _cloud_sdk._CREDENTIALS_FILENAME) + + +def test_get_config_path_env_var(monkeypatch): + config_path_sentinel = 'config_path' + monkeypatch.setenv( + environment_vars.CLOUD_SDK_CONFIG_DIR, config_path_sentinel) + config_path = _cloud_sdk.get_config_path() + assert config_path == config_path_sentinel + + [email protected]('os.path.expanduser') +def test_get_config_path_unix(mock_expanduser): + mock_expanduser.side_effect = lambda path: path + + config_path = _cloud_sdk.get_config_path() + + assert os.path.split(config_path) == ( + '~/.config', _cloud_sdk._CONFIG_DIRECTORY) + + [email protected]('os.name', new='nt') +def test_get_config_path_windows(monkeypatch): + appdata = 'appdata' + monkeypatch.setenv(_cloud_sdk._WINDOWS_CONFIG_ROOT_ENV_VAR, appdata) + + config_path = _cloud_sdk.get_config_path() + + assert os.path.split(config_path) == ( + appdata, _cloud_sdk._CONFIG_DIRECTORY) + + [email protected]('os.name', new='nt') +def test_get_config_path_no_appdata(monkeypatch): + monkeypatch.delenv(_cloud_sdk._WINDOWS_CONFIG_ROOT_ENV_VAR, raising=False) + monkeypatch.setenv('SystemDrive', 'G:') + + config_path = _cloud_sdk.get_config_path() + + assert os.path.split(config_path) == ( + 'G:/\\', _cloud_sdk._CONFIG_DIRECTORY) + + +def test_load_authorized_user_credentials(): + credentials = _cloud_sdk.load_authorized_user_credentials( + AUTHORIZED_USER_FILE_DATA) + + assert isinstance(credentials, google.oauth2.credentials.Credentials) + + assert credentials.token is None + assert (credentials._refresh_token == + AUTHORIZED_USER_FILE_DATA['refresh_token']) + assert credentials._client_id == AUTHORIZED_USER_FILE_DATA['client_id'] + assert (credentials._client_secret == + AUTHORIZED_USER_FILE_DATA['client_secret']) + assert credentials._token_uri == _cloud_sdk._GOOGLE_OAUTH2_TOKEN_ENDPOINT + + +def test_load_authorized_user_credentials_bad_format(): + with pytest.raises(ValueError) as excinfo: + _cloud_sdk.load_authorized_user_credentials({}) + + assert excinfo.match(r'missing fields')
diff --git a/tests/test__default.py b/tests/test__default.py new file mode 100644 index 0000000..137fdcd --- /dev/null +++ b/tests/test__default.py
@@ -0,0 +1,263 @@ +# Copyright 2016 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +import mock +import pytest + +from google.auth import _default +from google.auth import compute_engine +from google.auth import environment_vars +from google.auth import exceptions +from google.oauth2 import service_account +import google.oauth2.credentials + + +DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') +AUTHORIZED_USER_FILE = os.path.join(DATA_DIR, 'authorized_user.json') + +with open(AUTHORIZED_USER_FILE) as fh: + AUTHORIZED_USER_FILE_DATA = json.load(fh) + +SERVICE_ACCOUNT_FILE = os.path.join(DATA_DIR, 'service_account.json') + +with open(SERVICE_ACCOUNT_FILE) as fh: + SERVICE_ACCOUNT_FILE_DATA = json.load(fh) + +with open(os.path.join(DATA_DIR, 'cloud_sdk.cfg')) as fh: + CLOUD_SDK_CONFIG_DATA = fh.read() + +LOAD_FILE_PATCH = mock.patch( + 'google.auth._default._load_credentials_from_file', return_value=( + mock.sentinel.credentials, mock.sentinel.project_id)) + + +def test__load_credentials_from_file_invalid_json(tmpdir): + jsonfile = tmpdir.join('invalid.json') + jsonfile.write('{') + + with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: + _default._load_credentials_from_file(str(jsonfile)) + + assert excinfo.match(r'not a valid json file') + + +def test__load_credentials_from_file_invalid_type(tmpdir): + jsonfile = tmpdir.join('invalid.json') + jsonfile.write(json.dumps({'type': 'not-a-real-type'})) + + with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: + _default._load_credentials_from_file(str(jsonfile)) + + assert excinfo.match(r'does not have a valid type') + + +def test__load_credentials_from_file_authorized_user(): + credentials, project_id = _default._load_credentials_from_file( + AUTHORIZED_USER_FILE) + assert isinstance(credentials, google.oauth2.credentials.Credentials) + assert project_id is None + + +def test__load_credentials_from_file_authorized_user_bad_format(tmpdir): + filename = tmpdir.join('authorized_user_bad.json') + filename.write(json.dumps({'type': 'authorized_user'})) + + with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: + _default._load_credentials_from_file(str(filename)) + + assert excinfo.match(r'Failed to load authorized user') + assert excinfo.match(r'missing fields') + + +def test__load_credentials_from_file_service_account(): + credentials, project_id = _default._load_credentials_from_file( + SERVICE_ACCOUNT_FILE) + assert isinstance(credentials, service_account.Credentials) + assert project_id == SERVICE_ACCOUNT_FILE_DATA['project_id'] + + +def test__load_credentials_from_file_service_account_bad_format(tmpdir): + filename = tmpdir.join('serivce_account_bad.json') + filename.write(json.dumps({'type': 'service_account'})) + + with pytest.raises(exceptions.DefaultCredentialsError) as excinfo: + _default._load_credentials_from_file(str(filename)) + + assert excinfo.match(r'Failed to load service account') + assert excinfo.match(r'missing fields') + + [email protected](os.environ, {}, clear=True) +def test__get_explicit_environ_credentials_no_env(): + assert _default._get_explicit_environ_credentials() == (None, None) + + +@LOAD_FILE_PATCH +def test__get_explicit_environ_credentials(mock_load, monkeypatch): + monkeypatch.setenv(environment_vars.CREDENTIALS, 'filename') + + credentials, project_id = _default._get_explicit_environ_credentials() + + assert credentials is mock.sentinel.credentials + assert project_id is mock.sentinel.project_id + mock_load.assert_called_with('filename') + + +@LOAD_FILE_PATCH +def test__get_explicit_environ_credentials_no_project_id( + mock_load, monkeypatch): + mock_load.return_value = (mock.sentinel.credentials, None) + monkeypatch.setenv(environment_vars.CREDENTIALS, 'filename') + + credentials, project_id = _default._get_explicit_environ_credentials() + + assert credentials is mock.sentinel.credentials + assert project_id is None + + +@LOAD_FILE_PATCH [email protected]('google.auth._cloud_sdk.get_application_default_credentials_path') +def test__get_gcloud_sdk_credentials( + mock_get_adc_path, mock_load): + mock_get_adc_path.return_value = SERVICE_ACCOUNT_FILE + + credentials, project_id = _default._get_gcloud_sdk_credentials() + + assert credentials is mock.sentinel.credentials + assert project_id is mock.sentinel.project_id + mock_load.assert_called_with(SERVICE_ACCOUNT_FILE) + + [email protected]('google.auth._cloud_sdk.get_application_default_credentials_path') +def test__get_gcloud_sdk_credentials_non_existent(mock_get_adc_path, tmpdir): + non_existent = tmpdir.join('non-existent') + mock_get_adc_path.return_value = str(non_existent) + + credentials, project_id = _default._get_gcloud_sdk_credentials() + + assert credentials is None + assert project_id is None + + [email protected]( + 'google.auth._cloud_sdk.get_project_id', + return_value=mock.sentinel.project_id) [email protected]('os.path.isfile', return_value=True) +@LOAD_FILE_PATCH +def test__get_gcloud_sdk_credentials_project_id( + mock_load, unused_mock_isfile, mock_get_project_id): + # Don't return a project ID from load file, make the function check + # the Cloud SDK project. + mock_load.return_value = (mock.sentinel.credentials, None) + + credentials, project_id = _default._get_gcloud_sdk_credentials() + + assert credentials == mock.sentinel.credentials + assert project_id == mock.sentinel.project_id + assert mock_get_project_id.called + + [email protected]( + 'google.auth._cloud_sdk.get_project_id', + return_value=None) [email protected]('os.path.isfile', return_value=True) +@LOAD_FILE_PATCH +def test__get_gcloud_sdk_credentials_no_project_id( + mock_load, unused_mock_isfile, mock_get_project_id): + # Don't return a project ID from load file, make the function check + # the Cloud SDK project. + mock_load.return_value = (mock.sentinel.credentials, None) + + credentials, project_id = _default._get_gcloud_sdk_credentials() + + assert credentials == mock.sentinel.credentials + assert project_id is None + + +def test__get_gae_credentials(): + assert _default._get_gae_credentials() == (None, None) + + [email protected]( + 'google.auth.compute_engine._metadata.ping', return_value=True) [email protected]( + 'google.auth.compute_engine._metadata.get', return_value='example-project') +def test__get_gce_credentials(get_mock, ping_mock): + credentials, project_id = _default._get_gce_credentials() + + assert isinstance(credentials, compute_engine.Credentials) + assert project_id == 'example-project' + + [email protected]('google.auth.compute_engine._metadata.ping', return_value=False) +def test__get_gce_credentials_no_ping(ping_mock): + credentials, project_id = _default._get_gce_credentials() + + assert credentials is None + assert project_id is None + + [email protected]( + 'google.auth.compute_engine._metadata.ping', return_value=True) [email protected]( + 'google.auth.compute_engine._metadata.get', + side_effect=exceptions.TransportError()) +def test__get_gce_credentials_no_project_id(get_mock, ping_mock): + credentials, project_id = _default._get_gce_credentials() + + assert isinstance(credentials, compute_engine.Credentials) + assert project_id is None + + [email protected]('google.auth.compute_engine._metadata.ping', return_value=False) +def test__get_gce_credentials_explicit_request(ping_mock): + _default._get_gce_credentials(mock.sentinel.request) + ping_mock.assert_called_with(request=mock.sentinel.request) + + [email protected]( + 'google.auth._default._get_explicit_environ_credentials', + return_value=(mock.sentinel.credentials, mock.sentinel.project_id)) +def test_default_early_out(get_mock): + assert _default.default() == ( + mock.sentinel.credentials, mock.sentinel.project_id) + + [email protected]( + 'google.auth._default._get_explicit_environ_credentials', + return_value=(mock.sentinel.credentials, mock.sentinel.project_id)) +def test_default_explict_project_id(get_mock, monkeypatch): + monkeypatch.setenv(environment_vars.PROJECT, 'explicit-env') + assert _default.default() == ( + mock.sentinel.credentials, 'explicit-env') + + [email protected]( + 'google.auth._default._get_explicit_environ_credentials', + return_value=(None, None)) [email protected]( + 'google.auth._default._get_gcloud_sdk_credentials', + return_value=(None, None)) [email protected]( + 'google.auth._default._get_gae_credentials', + return_value=(None, None)) [email protected]( + 'google.auth._default._get_gce_credentials', + return_value=(None, None)) +def test_default_fail(unused_gce, unused_gae, unused_sdk, unused_explicit): + with pytest.raises(exceptions.DefaultCredentialsError): + assert _default.default()