blob: 9630fde159c75fe94e50ff9f4660781e551f4439 [file] [log] [blame] [edit]
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import struct
import pytest
from bumble.controller import Controller
from bumble.gatt_client import CharacteristicProxy
from bumble.link import LocalLink
from bumble.device import Device, Peer
from bumble.host import Host
from bumble.gatt import (
GATT_BATTERY_LEVEL_CHARACTERISTIC,
GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR,
CharacteristicAdapter,
DelegatedCharacteristicAdapter,
PackedCharacteristicAdapter,
MappedCharacteristicAdapter,
UTF8CharacteristicAdapter,
Service,
Characteristic,
CharacteristicValue
)
from bumble.transport import AsyncPipeSink
from bumble.core import UUID
from bumble.att import (
ATT_EXCHANGE_MTU_REQUEST,
ATT_ATTRIBUTE_NOT_FOUND_ERROR,
ATT_PDU,
ATT_Error_Response,
ATT_Read_By_Group_Type_Request
)
# -----------------------------------------------------------------------------
def basic_check(x):
pdu = x.to_bytes()
parsed = ATT_PDU.from_bytes(pdu)
x_str = str(x)
parsed_str = str(parsed)
assert x_str == parsed_str
# -----------------------------------------------------------------------------
def test_UUID():
u = UUID.from_16_bits(0x7788)
assert str(u) == 'UUID-16:7788'
u = UUID.from_32_bits(0x11223344)
assert str(u) == 'UUID-32:11223344'
u = UUID('61A3512C-09BE-4DDC-A6A6-0B03667AAFC6')
assert str(u) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
v = UUID(str(u))
assert str(v) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
w = UUID.from_bytes(v.to_bytes())
assert str(w) == '61A3512C-09BE-4DDC-A6A6-0B03667AAFC6'
u1 = UUID.from_16_bits(0x1234)
b1 = u1.to_bytes(force_128 = True)
u2 = UUID.from_bytes(b1)
assert u1 == u2
u3 = UUID.from_16_bits(0x180a)
assert str(u3) == 'UUID-16:180A (Device Information)'
# -----------------------------------------------------------------------------
def test_ATT_Error_Response():
pdu = ATT_Error_Response(
request_opcode_in_error = ATT_EXCHANGE_MTU_REQUEST,
attribute_handle_in_error = 0x0000,
error_code = ATT_ATTRIBUTE_NOT_FOUND_ERROR
)
basic_check(pdu)
# -----------------------------------------------------------------------------
def test_ATT_Read_By_Group_Type_Request():
pdu = ATT_Read_By_Group_Type_Request(
starting_handle = 0x0001,
ending_handle = 0xFFFF,
attribute_group_type = UUID.from_16_bits(0x2800)
)
basic_check(pdu)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_characteristic_encoding():
class Foo(Characteristic):
def encode_value(self, value):
return bytes([value])
def decode_value(self, value_bytes):
return value_bytes[0]
c = Foo(GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, 123)
x = c.read_value(None)
assert x == bytes([123])
c.write_value(None, bytes([122]))
assert c.value == 122
class FooProxy(CharacteristicProxy):
def __init__(self, characteristic):
super().__init__(
characteristic.client,
characteristic.handle,
characteristic.end_group_handle,
characteristic.uuid,
characteristic.properties
)
def encode_value(self, value):
return bytes([value])
def decode_value(self, value_bytes):
return value_bytes[0]
[client, server] = LinkedDevices().devices[:2]
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123])
)
service = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[characteristic]
)
server.add_service(service)
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic.uuid)
assert len(c) == 1
c = c[0]
cp = FooProxy(c)
v = await cp.read_value()
assert v == 123
await cp.write_value(124)
await async_barrier()
assert characteristic.value == bytes([124])
v = await cp.read_value()
assert v == 124
await cp.write_value(125, with_response=True)
await async_barrier()
assert characteristic.value == bytes([125])
cd = DelegatedCharacteristicAdapter(c, encode=lambda x: bytes([x // 2]))
await cd.write_value(100, with_response=True)
await async_barrier()
assert characteristic.value == bytes([50])
last_change = None
def on_change(value):
nonlocal last_change
last_change = value
await c.subscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change == characteristic.value
last_change = None
await server.notify_subscribers(characteristic, value=bytes([125]))
await async_barrier()
assert last_change == bytes([125])
last_change = None
await c.unsubscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change is None
await cp.subscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change == characteristic.value[0]
last_change = None
await server.notify_subscribers(characteristic, value=bytes([126]))
await async_barrier()
assert last_change == 126
last_change = None
await cp.unsubscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change is None
cd = DelegatedCharacteristicAdapter(c, decode=lambda x: x[0])
await cd.subscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change == characteristic.value[0]
last_change = None
await cd.unsubscribe(on_change)
await server.notify_subscribers(characteristic)
await async_barrier()
assert last_change is None
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_attribute_getters():
[client, server] = LinkedDevices().devices[:2]
characteristic_uuid = UUID('FDB159DB-036C-49E3-B3DB-6325AC750806')
characteristic = Characteristic(
characteristic_uuid,
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123])
)
service_uuid = UUID('3A657F47-D34F-46B3-B1EC-698E29B6B829')
service = Service(service_uuid, [characteristic])
server.add_service(service)
service_attr = server.gatt_server.get_service_attribute(service_uuid)
assert service_attr
(char_decl_attr, char_value_attr) = server.gatt_server.get_characteristic_attributes(service_uuid, characteristic_uuid)
assert char_decl_attr and char_value_attr
desc_attr = server.gatt_server.get_descriptor_attribute(service_uuid, characteristic_uuid, GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR)
assert desc_attr
# assert all handles are in expected order
assert service_attr.handle < char_decl_attr.handle < char_value_attr.handle < desc_attr.handle == service_attr.end_group_handle
# assert characteristic declarations attribute is followed by characteristic value attribute
assert char_decl_attr.handle + 1 == char_value_attr.handle
# -----------------------------------------------------------------------------
def test_CharacteristicAdapter():
# Check that the CharacteristicAdapter base class is transparent
v = bytes([1, 2, 3])
c = Characteristic(GATT_BATTERY_LEVEL_CHARACTERISTIC, Characteristic.READ, Characteristic.READABLE, v)
a = CharacteristicAdapter(c)
value = a.read_value(None)
assert value == v
v = bytes([3, 4, 5])
a.write_value(None, v)
assert c.value == v
# Simple delegated adapter
a = DelegatedCharacteristicAdapter(c, lambda x: bytes(reversed(x)), lambda x: bytes(reversed(x)))
value = a.read_value(None)
assert value == bytes(reversed(v))
v = bytes([3, 4, 5])
a.write_value(None, v)
assert a.value == bytes(reversed(v))
# Packed adapter with single element format
v = 1234
pv = struct.pack('>H', v)
c.value = v
a = PackedCharacteristicAdapter(c, '>H')
value = a.read_value(None)
assert value == pv
c.value = None
a.write_value(None, pv)
assert a.value == v
# Packed adapter with multi-element format
v1 = 1234
v2 = 5678
pv = struct.pack('>HH', v1, v2)
c.value = (v1, v2)
a = PackedCharacteristicAdapter(c, '>HH')
value = a.read_value(None)
assert value == pv
c.value = None
a.write_value(None, pv)
assert a.value == (v1, v2)
# Mapped adapter
v1 = 1234
v2 = 5678
pv = struct.pack('>HH', v1, v2)
mapped = {'v1': v1, 'v2': v2}
c.value = mapped
a = MappedCharacteristicAdapter(c, '>HH', ('v1', 'v2'))
value = a.read_value(None)
assert value == pv
c.value = None
a.write_value(None, pv)
assert a.value == mapped
# UTF-8 adapter
v = 'Hello π'
ev = v.encode('utf-8')
c.value = v
a = UTF8CharacteristicAdapter(c)
value = a.read_value(None)
assert value == ev
c.value = None
a.write_value(None, ev)
assert a.value == v
# -----------------------------------------------------------------------------
def test_CharacteristicValue():
b = bytes([1, 2, 3])
c = CharacteristicValue(read=lambda _: b)
x = c.read(None)
assert x == b
result = []
c = CharacteristicValue(write=lambda connection, value: result.append((connection, value)))
z = object()
c.write(z, b)
assert result == [(z, b)]
# -----------------------------------------------------------------------------
class LinkedDevices:
def __init__(self):
self.connections = [None, None, None]
self.link = LocalLink()
self.controllers = [
Controller('C1', link = self.link),
Controller('C2', link = self.link),
Controller('C3', link = self.link)
]
self.devices = [
Device(
address = 'F0:F1:F2:F3:F4:F5',
host = Host(self.controllers[0], AsyncPipeSink(self.controllers[0]))
),
Device(
address = 'F1:F2:F3:F4:F5:F6',
host = Host(self.controllers[1], AsyncPipeSink(self.controllers[1]))
),
Device(
address = 'F2:F3:F4:F5:F6:F7',
host = Host(self.controllers[2], AsyncPipeSink(self.controllers[2]))
)
]
self.paired = [None, None, None]
# -----------------------------------------------------------------------------
async def async_barrier():
ready = asyncio.get_running_loop().create_future()
asyncio.get_running_loop().call_soon(ready.set_result, None)
await ready
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_write():
[client, server] = LinkedDevices().devices[:2]
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE
)
def on_characteristic1_write(connection, value):
characteristic1._last_value = (connection, value)
characteristic1.on('write', on_characteristic1_write)
def on_characteristic2_read(connection):
return bytes(str(connection.peer_address))
def on_characteristic2_write(connection, value):
characteristic2._last_value = (connection, value)
characteristic2 = Characteristic(
'66DE9057-C848-4ACA-B993-D675644EBB85',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
CharacteristicValue(read=on_characteristic2_read, write=on_characteristic2_write)
)
service1 = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[
characteristic1,
characteristic2
]
)
server.add_services([service1])
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic1.uuid)
assert len(c) == 1
c1 = c[0]
c = peer.get_characteristics_by_uuid(characteristic2.uuid)
assert len(c) == 1
c2 = c[0]
v1 = await peer.read_value(c1)
assert v1 == b''
b = bytes([1, 2, 3])
await peer.write_value(c1, b)
await async_barrier()
assert characteristic1.value == b
v1 = await peer.read_value(c1)
assert v1 == b
assert type(characteristic1._last_value is tuple)
assert len(characteristic1._last_value) == 2
assert str(characteristic1._last_value[0].peer_address) == str(client.random_address)
assert characteristic1._last_value[1] == b
bb = bytes([3, 4, 5, 6])
characteristic1.value = bb
v1 = await peer.read_value(c1)
assert v1 == bb
await peer.write_value(c2, b)
await async_barrier()
assert type(characteristic2._last_value is tuple)
assert len(characteristic2._last_value) == 2
assert str(characteristic2._last_value[0].peer_address) == str(client.random_address)
assert characteristic2._last_value[1] == b
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_read_write2():
[client, server] = LinkedDevices().devices[:2]
v = bytes([0x11, 0x22, 0x33, 0x44])
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE,
Characteristic.READABLE | Characteristic.WRITEABLE,
value=v
)
service1 = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[
characteristic1
]
)
server.add_services([service1])
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
c = peer.get_services_by_uuid(service1.uuid)
assert len(c) == 1
s = c[0]
await s.discover_characteristics()
c = s.get_characteristics_by_uuid(characteristic1.uuid)
assert len(c) == 1
c1 = c[0]
v1 = await c1.read_value()
assert v1 == v
a1 = PackedCharacteristicAdapter(c1, '>I')
v1 = await a1.read_value()
assert v1 == struct.unpack('>I', v)[0]
b = bytes([0x55, 0x66, 0x77, 0x88])
await a1.write_value(struct.unpack('>I', b)[0])
await async_barrier()
assert characteristic1.value == b
v1 = await a1.read_value()
assert v1 == struct.unpack('>I', b)[0]
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_subscribe_notify():
[client, server] = LinkedDevices().devices[:2]
characteristic1 = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.NOTIFY,
Characteristic.READABLE,
bytes([1, 2, 3])
)
def on_characteristic1_subscription(connection, notify_enabled, indicate_enabled):
characteristic1._last_subscription = (connection, notify_enabled, indicate_enabled)
characteristic1.on('subscription', on_characteristic1_subscription)
characteristic2 = Characteristic(
'66DE9057-C848-4ACA-B993-D675644EBB85',
Characteristic.READ | Characteristic.INDICATE,
Characteristic.READABLE,
bytes([4, 5, 6])
)
def on_characteristic2_subscription(connection, notify_enabled, indicate_enabled):
characteristic2._last_subscription = (connection, notify_enabled, indicate_enabled)
characteristic2.on('subscription', on_characteristic2_subscription)
characteristic3 = Characteristic(
'AB5E639C-40C1-4238-B9CB-AF41F8B806E4',
Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
Characteristic.READABLE,
bytes([7, 8, 9])
)
def on_characteristic3_subscription(connection, notify_enabled, indicate_enabled):
characteristic3._last_subscription = (connection, notify_enabled, indicate_enabled)
characteristic3.on('subscription', on_characteristic3_subscription)
service1 = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[
characteristic1,
characteristic2,
characteristic3
]
)
server.add_services([service1])
def on_characteristic_subscription(connection, characteristic, notify_enabled, indicate_enabled):
server._last_subscription = (connection, characteristic, notify_enabled, indicate_enabled)
server.on('characteristic_subscription', on_characteristic_subscription)
await client.power_on()
await server.power_on()
connection = await client.connect(server.random_address)
peer = Peer(connection)
await peer.discover_services()
await peer.discover_characteristics()
c = peer.get_characteristics_by_uuid(characteristic1.uuid)
assert len(c) == 1
c1 = c[0]
c = peer.get_characteristics_by_uuid(characteristic2.uuid)
assert len(c) == 1
c2 = c[0]
c = peer.get_characteristics_by_uuid(characteristic3.uuid)
assert len(c) == 1
c3 = c[0]
c1._called = False
c1._last_update = None
def on_c1_update(value):
c1._called = True
c1._last_update = value
c1.on('update', on_c1_update)
await peer.subscribe(c1)
await async_barrier()
assert server._last_subscription[1] == characteristic1
assert server._last_subscription[2]
assert not server._last_subscription[3]
assert characteristic1._last_subscription[1]
assert not characteristic1._last_subscription[2]
await server.indicate_subscribers(characteristic1)
await async_barrier()
assert not c1._called
await server.notify_subscribers(characteristic1)
await async_barrier()
assert c1._called
assert c1._last_update == characteristic1.value
c1._called = False
c1._last_update = None
c1_value = characteristic1.value
await server.notify_subscribers(characteristic1, bytes([0, 1, 2]))
await async_barrier()
assert c1._called
assert c1._last_update == bytes([0, 1, 2])
assert characteristic1.value == c1_value
c1._called = False
await peer.unsubscribe(c1)
await server.notify_subscribers(characteristic1)
assert not c1._called
c2._called = False
c2._last_update = None
def on_c2_update(value):
c2._called = True
c2._last_update = value
await peer.subscribe(c2, on_c2_update)
await async_barrier()
await server.notify_subscriber(characteristic2._last_subscription[0], characteristic2)
await async_barrier()
assert not c2._called
await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2)
await async_barrier()
assert c2._called
assert c2._last_update == characteristic2.value
c2._called = False
await peer.unsubscribe(c2, on_c2_update)
await server.indicate_subscriber(characteristic2._last_subscription[0], characteristic2)
await async_barrier()
assert not c2._called
c3._called = False
c3._called_2 = False
c3._called_3 = False
c3._last_update = None
c3._last_update_2 = None
c3._last_update_3 = None
def on_c3_update(value):
c3._called = True
c3._last_update = value
def on_c3_update_2(value): # for notify
c3._called_2 = True
c3._last_update_2 = value
def on_c3_update_3(value): # for indicate
c3._called_3 = True
c3._last_update_3 = value
c3.on('update', on_c3_update)
await peer.subscribe(c3, on_c3_update_2)
await async_barrier()
await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier()
assert c3._called
assert c3._last_update == characteristic3.value
assert c3._called_2
assert c3._last_update_2 == characteristic3.value
assert not c3._called_3
c3._called = False
c3._called_2 = False
c3._called_3 = False
await peer.unsubscribe(c3)
await peer.subscribe(c3, on_c3_update_3, prefer_notify=False)
await async_barrier()
characteristic3.value = bytes([1, 2, 3])
await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier()
assert c3._called
assert c3._last_update == characteristic3.value
assert not c3._called_2
assert c3._called_3
assert c3._last_update_3 == characteristic3.value
c3._called = False
c3._called_2 = False
c3._called_3 = False
await peer.unsubscribe(c3)
await server.notify_subscriber(characteristic3._last_subscription[0], characteristic3)
await server.indicate_subscriber(characteristic3._last_subscription[0], characteristic3)
await async_barrier()
assert not c3._called
assert not c3._called_2
assert not c3._called_3
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_mtu_exchange():
[d1, d2, d3] = LinkedDevices().devices[:3]
d3.gatt_server.max_mtu = 100
d3_connections = []
@d3.on('connection')
def on_d3_connection(connection):
d3_connections.append(connection)
await d1.power_on()
await d2.power_on()
await d3.power_on()
d1_connection = await d1.connect(d3.random_address)
assert len(d3_connections) == 1
assert d3_connections[0] is not None
d2_connection = await d2.connect(d3.random_address)
assert len(d3_connections) == 2
assert d3_connections[1] is not None
d1_peer = Peer(d1_connection)
d2_peer = Peer(d2_connection)
d1_client_mtu = await d1_peer.request_mtu(220)
assert d1_client_mtu == 100
assert d1_connection.att_mtu == 100
d2_client_mtu = await d2_peer.request_mtu(50)
assert d2_client_mtu == 50
assert d2_connection.att_mtu == 50
# -----------------------------------------------------------------------------
def test_char_property_to_string():
# single
assert Characteristic.property_name(0x01) == "BROADCAST"
assert Characteristic.property_name(Characteristic.BROADCAST) == "BROADCAST"
# double
assert Characteristic.properties_as_string(0x03) == "BROADCAST,READ"
assert Characteristic.properties_as_string(Characteristic.BROADCAST | Characteristic.READ) == "BROADCAST,READ"
# -----------------------------------------------------------------------------
def test_char_property_string_to_type():
# single
assert Characteristic.string_to_properties("BROADCAST") == Characteristic.BROADCAST
# double
assert Characteristic.string_to_properties("BROADCAST,READ") == Characteristic.BROADCAST | Characteristic.READ
assert Characteristic.string_to_properties("READ,BROADCAST") == Characteristic.BROADCAST | Characteristic.READ
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_server_string():
[_, server] = LinkedDevices().devices[:2]
characteristic = Characteristic(
'FDB159DB-036C-49E3-B3DB-6325AC750806',
Characteristic.READ | Characteristic.WRITE | Characteristic.NOTIFY,
Characteristic.READABLE | Characteristic.WRITEABLE,
bytes([123])
)
service = Service(
'3A657F47-D34F-46B3-B1EC-698E29B6B829',
[characteristic]
)
server.add_service(service)
assert str(server.gatt_server) == """Service(handle=0x0001, end=0x0005, uuid=UUID-16:1800 (Generic Access))
CharacteristicDeclaration(handle=0x0002, value_handle=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
Characteristic(handle=0x0003, end=0x0003, uuid=UUID-16:2A00 (Device Name), properties=READ)
CharacteristicDeclaration(handle=0x0004, value_handle=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
Characteristic(handle=0x0005, end=0x0005, uuid=UUID-16:2A01 (Appearance), properties=READ)
Service(handle=0x0006, end=0x0009, uuid=3A657F47-D34F-46B3-B1EC-698E29B6B829)
CharacteristicDeclaration(handle=0x0007, value_handle=0x0008, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
Characteristic(handle=0x0008, end=0x0009, uuid=FDB159DB-036C-49E3-B3DB-6325AC750806, properties=READ,WRITE,NOTIFY)
Descriptor(handle=0x0009, type=UUID-16:2902 (Client Characteristic Configuration), value=0000)"""
# -----------------------------------------------------------------------------
async def async_main():
await test_read_write()
await test_read_write2()
await test_subscribe_notify()
await test_characteristic_encoding()
await test_mtu_exchange()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
test_UUID()
test_ATT_Error_Response()
test_ATT_Read_By_Group_Type_Request()
test_CharacteristicValue()
test_CharacteristicAdapter()
asyncio.run(async_main())