| # Copyright 2021-2023 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # ----------------------------------------------------------------------------- |
| # Imports |
| # ----------------------------------------------------------------------------- |
| import asyncio |
| import os |
| import pytest |
| import struct |
| import logging |
| from unittest import mock |
| |
| from bumble import device |
| from bumble.profiles import csip |
| from .test_utils import TwoDevices |
| |
| # ----------------------------------------------------------------------------- |
| # Logging |
| # ----------------------------------------------------------------------------- |
| logger = logging.getLogger(__name__) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def test_s1(): |
| assert ( |
| csip.s1(b'SIRKenc'[::-1]) |
| == bytes.fromhex('6901983f 18149e82 3c7d133a 7d774572')[::-1] |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def test_k1(): |
| K = bytes.fromhex('676e1b9b d448696f 061ec622 3ce5ced9')[::-1] |
| SALT = csip.s1(b'SIRKenc'[::-1]) |
| P = b'csis'[::-1] |
| assert ( |
| csip.k1(K, SALT, P) |
| == bytes.fromhex('5277453c c094d982 b0e8ee53 2f2d1f8b')[::-1] |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def test_sih(): |
| SIRK = bytes.fromhex('457d7d09 21a1fd22 cecd8c86 dd72cccd')[::-1] |
| PRAND = bytes.fromhex('69f563')[::-1] |
| assert csip.sih(SIRK, PRAND) == bytes.fromhex('1948da')[::-1] |
| |
| |
| # ----------------------------------------------------------------------------- |
| def test_sef(): |
| SIRK = bytes.fromhex('457d7d09 21a1fd22 cecd8c86 dd72cccd')[::-1] |
| K = bytes.fromhex('676e1b9b d448696f 061ec622 3ce5ced9')[::-1] |
| assert ( |
| csip.sef(K, SIRK) == bytes.fromhex('170a3835 e13524a0 7e2562d5 f25fd346')[::-1] |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| @pytest.mark.parametrize( |
| 'sirk_type,', [(csip.SirkType.ENCRYPTED), (csip.SirkType.PLAINTEXT)] |
| ) |
| async def test_csis(sirk_type): |
| SIRK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa') |
| LTK = bytes.fromhex('2f62c8ae41867d1bb619e788a2605faa') |
| |
| devices = TwoDevices() |
| devices[0].add_service( |
| csip.CoordinatedSetIdentificationService( |
| set_identity_resolving_key=SIRK, |
| set_identity_resolving_key_type=sirk_type, |
| coordinated_set_size=2, |
| set_member_lock=csip.MemberLock.UNLOCKED, |
| set_member_rank=0, |
| ) |
| ) |
| |
| await devices.setup_connection() |
| |
| # Mock encryption. |
| devices.connections[0].encryption = 1 |
| devices.connections[1].encryption = 1 |
| devices[0].get_long_term_key = mock.AsyncMock(return_value=LTK) |
| devices[1].get_long_term_key = mock.AsyncMock(return_value=LTK) |
| |
| peer = device.Peer(devices.connections[1]) |
| csis_client = await peer.discover_service_and_create_proxy( |
| csip.CoordinatedSetIdentificationProxy |
| ) |
| |
| assert await csis_client.read_set_identity_resolving_key() == (sirk_type, SIRK) |
| assert await csis_client.coordinated_set_size.read_value() == struct.pack('B', 2) |
| assert await csis_client.set_member_lock.read_value() == struct.pack( |
| 'B', csip.MemberLock.UNLOCKED |
| ) |
| assert await csis_client.set_member_rank.read_value() == struct.pack('B', 0) |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def run(): |
| test_sih() |
| await test_csis() |
| |
| |
| # ----------------------------------------------------------------------------- |
| if __name__ == '__main__': |
| logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) |
| asyncio.run(run()) |