| # Copyright 2021-2022 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # ----------------------------------------------------------------------------- |
| # Imports |
| # ----------------------------------------------------------------------------- |
| import asyncio |
| import logging |
| import os |
| import struct |
| import pytest |
| |
| from bumble.controller import Controller |
| from bumble.link import LocalLink |
| from bumble.device import Device, Peer |
| from bumble.host import Host |
| from bumble.gatt import ( |
| GATT_BATTERY_LEVEL_CHARACTERISTIC, |
| CharacteristicAdapter, |
| DelegatedCharacteristicAdapter, |
| PackedCharacteristicAdapter, |
| MappedCharacteristicAdapter, |
| UTF8CharacteristicAdapter, |
| Service, |
| Characteristic, |
| CharacteristicValue |
| ) |
| from bumble.transport import AsyncPipeSink |
| from bumble.core import UUID |
| from bumble.att import ( |
| ATT_EXCHANGE_MTU_REQUEST, |
| ATT_ATTRIBUTE_NOT_FOUND_ERROR, |
| ATT_PDU, |
| ATT_Error_Response, |
| ATT_Read_By_Group_Type_Request |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def basic_check(x): |
| pdu = x.to_bytes() |
| parsed = ATT_PDU.from_bytes(pdu) |
| x_str = str(x) |
| parsed_str = str(parsed) |
| assert(x_str == parsed_str) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def test_UUID(): |
| u = UUID.from_16_bits(0x7788) |
| assert(str(u) == 'UUID-16:7788') |
| u = UUID.from_32_bits(0x11223344) |
| assert(str(u) == 'UUID-32:11223344') |
| u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') |
| assert(str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') |
| v = UUID(str(u)) |
| assert(str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') |
| w = UUID.from_bytes(v.to_bytes()) |
| assert(str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6') |
| |
| u1 = UUID.from_16_bits(0x1234) |
| b1 = u1.to_bytes(force_128 = True) |
| u2 = UUID.from_bytes(b1) |
| assert(u1 == u2) |
| |
| u3 = UUID.from_16_bits(0x180a) |
| assert(str(u3) == 'UUID-16:180A (Device Information)') |
| |
| |
| # ----------------------------------------------------------------------------- |
| def test_ATT_Error_Response(): |
| pdu = ATT_Error_Response( |
| request_opcode_in_error = ATT_EXCHANGE_MTU_REQUEST, |
| attribute_handle_in_error = 0x0000, |
| error_code = ATT_ATTRIBUTE_NOT_FOUND_ERROR |
| ) |
| basic_check(pdu) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def test_ATT_Read_By_Group_Type_Request(): |
| pdu = ATT_Read_By_Group_Type_Request( |
| starting_handle = 0x0001, |
| ending_handle = 0xFFFF, |
| attribute_group_type = UUID.from_16_bits(0x2800) |
| ) |
| basic_check(pdu) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def test_CharacteristicAdapter(): |
| # Check that the CharacteristicAdapter base class is transparent |
| v = bytes([1, 2, 3]) |
| c = Characteristic(GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, v) |
| a = CharacteristicAdapter(c) |
| |
| value = a.read_value(None) |
| assert(value == v) |
| |
| v = bytes([3, 4, 5]) |
| a.write_value(None, v) |
| assert(c.value == v) |
| |
| # Simple delegated adapter |
| a = DelegatedCharacteristicAdapter(c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x))) |
| |
| value = a.read_value(None) |
| assert(value == bytes(reversed(v))) |
| |
| v = bytes([3, 4, 5]) |
| a.write_value(None, v) |
| assert(a.value == bytes(reversed(v))) |
| |
| # Packed adapter with single element format |
| v = 1234 |
| pv = struct.pack('>H', v) |
| c.value = v |
| a = PackedCharacteristicAdapter(c, '>H') |
| |
| value = a.read_value(None) |
| assert(value == pv) |
| c.value = None |
| a.write_value(None, pv) |
| assert(a.value == v) |
| |
| # Packed adapter with multi-element format |
| v1 = 1234 |
| v2 = 5678 |
| pv = struct.pack('>HH', v1, v2) |
| c.value = (v1, v2) |
| a = PackedCharacteristicAdapter(c, '>HH') |
| |
| value = a.read_value(None) |
| assert(value == pv) |
| c.value = None |
| a.write_value(None, pv) |
| assert(a.value == (v1, v2)) |
| |
| # Mapped adapter |
| v1 = 1234 |
| v2 = 5678 |
| pv = struct.pack('>HH', v1, v2) |
| mapped = {'v1': v1, 'v2': v2} |
| c.value = mapped |
| a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2')) |
| |
| value = a.read_value(None) |
| assert(value == pv) |
| c.value = None |
| a.write_value(None, pv) |
| assert(a.value == mapped) |
| |
| # UTF-8 adapter |
| v = 'Hello π' |
| ev = v.encode('utf-8') |
| c.value = v |
| a = UTF8CharacteristicAdapter(c) |
| |
| value = a.read_value(None) |
| assert(value == ev) |
| c.value = None |
| a.write_value(None, ev) |
| assert(a.value == v) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def test_CharacteristicValue(): |
| b = bytes([1, 2, 3]) |
| c = CharacteristicValue(read=lambda _: b) |
| x = c.read(None) |
| assert(x == b) |
| |
| result = [] |
| c = CharacteristicValue(write=lambda connection, value: result.append((connection, value))) |
| z = object() |
| c.write(z, b) |
| assert(result == [(z, b)]) |
| |
| |
| # ----------------------------------------------------------------------------- |
| class TwoDevices: |
| def __init__(self): |
| self.connections = [None, None] |
| |
| self.link = LocalLink() |
| self.controllers = [ |
| Controller('C1', link = self.link), |
| Controller('C2', link = self.link) |
| ] |
| self.devices = [ |
| Device( |
| address = 'F0:F1:F2:F3:F4:F5', |
| host = Host(self.controllers[0], AsyncPipeSink(self.controllers[0])) |
| ), |
| Device( |
| address = 'F5:F4:F3:F2:F1:F0', |
| host = Host(self.controllers[1], AsyncPipeSink(self.controllers[1])) |
| ) |
| ] |
| |
| self.paired = [None, None] |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def async_barrier(): |
| ready = asyncio.get_running_loop().create_future() |
| asyncio.get_running_loop().call_soon(ready.set_result, None) |
| await ready |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_read_write(): |
| [client, server] = TwoDevices().devices |
| |
| characteristic1 = Characteristic( |
| 'FDB159DB-036C-49E3-B3DB-6325AC750806', |
| Characteristic.READ | Characteristic.WRITE, |
| Characteristic.READABLE | Characteristic.WRITEABLE |
| ) |
| |
| def on_characteristic1_write(connection, value): |
| characteristic1._last_value = (connection, value) |
| |
| characteristic1.on('write', on_characteristic1_write) |
| |
| def on_characteristic2_read(connection): |
| return bytes(str(connection.peer_address)) |
| |
| def on_characteristic2_write(connection, value): |
| characteristic2._last_value = (connection, value) |
| |
| characteristic2 = Characteristic( |
| '66DE9057-C848-4ACA-B993-D675644EBB85', |
| Characteristic.READ | Characteristic.WRITE, |
| Characteristic.READABLE | Characteristic.WRITEABLE, |
| CharacteristicValue(read=on_characteristic2_read, write=on_characteristic2_write) |
| ) |
| |
| service1 = Service( |
| '3A657F47-D34F-46B3-B1EC-698E29B6B829', |
| [ |
| characteristic1, |
| characteristic2 |
| ] |
| ) |
| server.add_services([service1]) |
| |
| await client.power_on() |
| await server.power_on() |
| connection = await client.connect(server.random_address) |
| peer = Peer(connection) |
| |
| await peer.discover_services() |
| await peer.discover_characteristics() |
| c = peer.get_characteristics_by_uuid(characteristic1.uuid) |
| assert(len(c) == 1) |
| c1 = c[0] |
| c = peer.get_characteristics_by_uuid(characteristic2.uuid) |
| assert(len(c) == 1) |
| c2 = c[0] |
| |
| v1 = await peer.read_value(c1) |
| assert(v1 == b'') |
| b = bytes([1, 2, 3]) |
| await peer.write_value(c1, b) |
| await async_barrier() |
| assert(characteristic1.value == b) |
| v1 = await peer.read_value(c1) |
| assert(v1 == b) |
| assert(type(characteristic1._last_value) is tuple) |
| assert(len(characteristic1._last_value) == 2) |
| assert(str(characteristic1._last_value[0].peer_address) == str(client.random_address)) |
| assert(characteristic1._last_value[1] == b) |
| bb = bytes([3, 4, 5, 6]) |
| characteristic1.value = bb |
| v1 = await peer.read_value(c1) |
| assert(v1 == bb) |
| |
| await peer.write_value(c2, b) |
| await async_barrier() |
| assert(type(characteristic2._last_value) is tuple) |
| assert(len(characteristic2._last_value) == 2) |
| assert(str(characteristic2._last_value[0].peer_address) == str(client.random_address)) |
| assert(characteristic2._last_value[1] == b) |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_read_write2(): |
| [client, server] = TwoDevices().devices |
| |
| v = bytes([0x11, 0x22, 0x33, 0x44]) |
| characteristic1 = Characteristic( |
| 'FDB159DB-036C-49E3-B3DB-6325AC750806', |
| Characteristic.READ | Characteristic.WRITE, |
| Characteristic.READABLE | Characteristic.WRITEABLE, |
| value=v |
| ) |
| |
| service1 = Service( |
| '3A657F47-D34F-46B3-B1EC-698E29B6B829', |
| [ |
| characteristic1 |
| ] |
| ) |
| server.add_services([service1]) |
| |
| await client.power_on() |
| await server.power_on() |
| connection = await client.connect(server.random_address) |
| peer = Peer(connection) |
| |
| await peer.discover_services() |
| c = peer.get_services_by_uuid(service1.uuid) |
| assert(len(c) == 1) |
| s = c[0] |
| await s.discover_characteristics() |
| c = s.get_characteristics_by_uuid(characteristic1.uuid) |
| assert(len(c) == 1) |
| c1 = c[0] |
| |
| v1 = await c1.read_value() |
| assert(v1 == v) |
| |
| a1 = PackedCharacteristicAdapter(c1, '>I') |
| v1 = await a1.read_value() |
| assert(v1 == struct.unpack('>I', v)[0]) |
| |
| b = bytes([0x55, 0x66, 0x77, 0x88]) |
| await a1.write_value(struct.unpack('>I', b)[0]) |
| await async_barrier() |
| assert(characteristic1.value == b) |
| v1 = await a1.read_value() |
| assert(v1 == struct.unpack('>I', b)[0]) |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_subscribe_notify(): |
| [client, server] = TwoDevices().devices |
| |
| characteristic1 = Characteristic( |
| 'FDB159DB-036C-49E3-B3DB-6325AC750806', |
| Characteristic.READ | Characteristic.NOTIFY, |
| Characteristic.READABLE, |
| bytes([1, 2, 3]) |
| ) |
| |
| def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled): |
| characteristic1._last_subscription = (connection, notify_enabled, indicate_enabled) |
| |
| characteristic1.on('subscription', on_characteristic1_subscription) |
| |
| characteristic2 = Characteristic( |
| '66DE9057-C848-4ACA-B993-D675644EBB85', |
| Characteristic.READ | Characteristic.INDICATE, |
| Characteristic.READABLE, |
| bytes([4, 5, 6]) |
| ) |
| |
| def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled): |
| characteristic2._last_subscription = (connection, notify_enabled, indicate_enabled) |
| |
| characteristic2.on('subscription', on_characteristic2_subscription) |
| |
| characteristic3 = Characteristic( |
| 'AB5E639C-40C1-4238-B9CB-AF41F8B806E4', |
| Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE, |
| Characteristic.READABLE, |
| bytes([7, 8, 9]) |
| ) |
| |
| def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled): |
| characteristic3._last_subscription = (connection, notify_enabled, indicate_enabled) |
| |
| characteristic3.on('subscription', on_characteristic3_subscription) |
| |
| service1 = Service( |
| '3A657F47-D34F-46B3-B1EC-698E29B6B829', |
| [ |
| characteristic1, |
| characteristic2, |
| characteristic3 |
| ] |
| ) |
| server.add_services([service1]) |
| |
| def on_characteristic_subscription(connection, characteristic, notify_enabled, indicate_enabled): |
| server._last_subscription = (connection, characteristic, notify_enabled, indicate_enabled) |
| |
| server.on('characteristic_subscription', on_characteristic_subscription) |
| |
| await client.power_on() |
| await server.power_on() |
| connection = await client.connect(server.random_address) |
| peer = Peer(connection) |
| |
| await peer.discover_services() |
| await peer.discover_characteristics() |
| c = peer.get_characteristics_by_uuid(characteristic1.uuid) |
| assert(len(c) == 1) |
| c1 = c[0] |
| c = peer.get_characteristics_by_uuid(characteristic2.uuid) |
| assert(len(c) == 1) |
| c2 = c[0] |
| c = peer.get_characteristics_by_uuid(characteristic3.uuid) |
| assert(len(c) == 1) |
| c3 = c[0] |
| |
| c1._last_update = None |
| |
| def on_c1_update(connection, value): |
| c1._last_update = (connection, value) |
| |
| c1.on('update', on_c1_update) |
| await peer.subscribe(c1) |
| await async_barrier() |
| assert(server._last_subscription[1] == characteristic1) |
| assert(server._last_subscription[2]) |
| assert(not server._last_subscription[3]) |
| assert(characteristic1._last_subscription[1]) |
| assert(not characteristic1._last_subscription[2]) |
| await server.indicate_subscribers(characteristic1) |
| await async_barrier() |
| assert(c1._last_update is None) |
| await server.notify_subscribers(characteristic1) |
| await async_barrier() |
| assert(c1._last_update is not None) |
| assert(c1._last_update[1] == characteristic1.value) |
| |
| c2._last_update = None |
| |
| def on_c2_update(value): |
| c2._last_update = (connection, value) |
| |
| await peer.subscribe(c2, on_c2_update) |
| await async_barrier() |
| await server.notify_subscriber(characteristic2._last_subscription[0], characteristic2) |
| await async_barrier() |
| assert(c2._last_update is None) |
| await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2) |
| await async_barrier() |
| assert(c2._last_update is not None) |
| assert(c2._last_update[1] == characteristic2.value) |
| |
| c3._last_update = None |
| |
| def on_c3_update(connection, value): |
| c3._last_update = (connection, value) |
| |
| c3.on('update', on_c3_update) |
| await peer.subscribe(c3) |
| await async_barrier() |
| await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3) |
| await async_barrier() |
| assert(c3._last_update is not None) |
| assert(c3._last_update[1] == characteristic3.value) |
| characteristic3.value = bytes([1, 2, 3]) |
| await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3) |
| await async_barrier() |
| assert(c3._last_update is not None) |
| assert(c3._last_update[1] == characteristic3.value) |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def async_main(): |
| await test_read_write() |
| await test_read_write2() |
| await test_subscribe_notify() |
| |
| # ----------------------------------------------------------------------------- |
| if __name__ == '__main__': |
| logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) |
| test_UUID() |
| test_ATT_Error_Response() |
| test_ATT_Read_By_Group_Type_Request() |
| test_CharacteristicValue() |
| test_CharacteristicAdapter() |
| asyncio.run(async_main()) |