| # Copyright 2021-2022 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # ----------------------------------------------------------------------------- |
| # Imports |
| # ----------------------------------------------------------------------------- |
| import asyncio |
| import json |
| import logging |
| import pathlib |
| import pytest |
| import tempfile |
| import os |
| |
| from bumble.keys import JsonKeyStore, PairingKeys |
| |
| |
| # ----------------------------------------------------------------------------- |
| # Logging |
| # ----------------------------------------------------------------------------- |
| logger = logging.getLogger(__name__) |
| |
| |
| # ----------------------------------------------------------------------------- |
| # Tests |
| # ----------------------------------------------------------------------------- |
| |
| JSON1 = """ |
| { |
| "my_namespace": { |
| "14:7D:DA:4E:53:A8/P": { |
| "address_type": 0, |
| "irk": { |
| "authenticated": false, |
| "value": "e7b2543b206e4e46b44f9e51dad22bd1" |
| }, |
| "link_key": { |
| "authenticated": false, |
| "value": "0745dd9691e693d9dca740f7d8dfea75" |
| }, |
| "ltk": { |
| "authenticated": false, |
| "value": "d1897ee10016eb1a08e4e037fd54c683" |
| } |
| } |
| } |
| } |
| """ |
| |
| JSON2 = """ |
| { |
| "my_namespace1": { |
| }, |
| "my_namespace2": { |
| } |
| } |
| """ |
| |
| JSON3 = """ |
| { |
| "my_namespace1": { |
| }, |
| "__DEFAULT__": { |
| "14:7D:DA:4E:53:A8/P": { |
| "address_type": 0, |
| "irk": { |
| "authenticated": false, |
| "value": "e7b2543b206e4e46b44f9e51dad22bd1" |
| } |
| } |
| } |
| } |
| """ |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.fixture |
| def temporary_file(): |
| file = tempfile.NamedTemporaryFile(delete=False) |
| file.close() |
| yield file.name |
| pathlib.Path(file.name).unlink() |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_basic(temporary_file): |
| with open(temporary_file, mode='w', encoding='utf-8') as file: |
| file.write("{}") |
| file.flush() |
| |
| keystore = JsonKeyStore('my_namespace', temporary_file) |
| |
| keys = await keystore.get_all() |
| assert len(keys) == 0 |
| |
| keys = PairingKeys() |
| await keystore.update('foo', keys) |
| foo = await keystore.get('foo') |
| assert foo is not None |
| assert foo.ltk is None |
| ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]) |
| keys.ltk = PairingKeys.Key(ltk) |
| await keystore.update('foo', keys) |
| foo = await keystore.get('foo') |
| assert foo is not None |
| assert foo.ltk is not None |
| assert foo.ltk.value == ltk |
| |
| with open(file.name, "r", encoding="utf-8") as json_file: |
| json_data = json.load(json_file) |
| assert 'my_namespace' in json_data |
| assert 'foo' in json_data['my_namespace'] |
| assert 'ltk' in json_data['my_namespace']['foo'] |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_parsing(temporary_file): |
| with open(temporary_file, mode='w', encoding='utf-8') as file: |
| file.write(JSON1) |
| file.flush() |
| |
| keystore = JsonKeyStore('my_namespace', file.name) |
| foo = await keystore.get('14:7D:DA:4E:53:A8/P') |
| assert foo is not None |
| assert foo.ltk.value == bytes.fromhex('d1897ee10016eb1a08e4e037fd54c683') |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_default_namespace(temporary_file): |
| with open(temporary_file, mode='w', encoding='utf-8') as file: |
| file.write(JSON1) |
| file.flush() |
| |
| keystore = JsonKeyStore(None, file.name) |
| all_keys = await keystore.get_all() |
| assert len(all_keys) == 1 |
| name, keys = all_keys[0] |
| assert name == '14:7D:DA:4E:53:A8/P' |
| assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1') |
| |
| with open(temporary_file, mode='w', encoding='utf-8') as file: |
| file.write(JSON2) |
| file.flush() |
| |
| keystore = JsonKeyStore(None, file.name) |
| keys = PairingKeys() |
| ltk = bytes([0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]) |
| keys.ltk = PairingKeys.Key(ltk) |
| await keystore.update('foo', keys) |
| with open(file.name, "r", encoding="utf-8") as json_file: |
| json_data = json.load(json_file) |
| assert '__DEFAULT__' in json_data |
| assert 'foo' in json_data['__DEFAULT__'] |
| assert 'ltk' in json_data['__DEFAULT__']['foo'] |
| |
| with open(temporary_file, mode='w', encoding='utf-8') as file: |
| file.write(JSON3) |
| file.flush() |
| |
| keystore = JsonKeyStore(None, file.name) |
| all_keys = await keystore.get_all() |
| assert len(all_keys) == 1 |
| name, keys = all_keys[0] |
| assert name == '14:7D:DA:4E:53:A8/P' |
| assert keys.irk.value == bytes.fromhex('e7b2543b206e4e46b44f9e51dad22bd1') |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def run_tests(): |
| await test_basic() |
| await test_parsing() |
| await test_default_namespace() |
| |
| |
| # ----------------------------------------------------------------------------- |
| if __name__ == '__main__': |
| logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) |
| asyncio.run(run_tests()) |