blob: 9a529dd2c006b1e07e248f63416ea9d760b6afcf [file] [log] [blame]
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Bumble Tool
# -----------------------------------------------------------------------------
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import random
import re
import humanize
from typing import Optional, Union
from collections import OrderedDict
import click
from prettytable import PrettyTable
from prompt_toolkit import Application
from prompt_toolkit.history import FileHistory
from prompt_toolkit.completion import Completer, Completion, NestedCompleter
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.formatted_text import ANSI
from prompt_toolkit.styles import Style
from prompt_toolkit.filters import Condition
from prompt_toolkit.widgets import TextArea, Frame
from prompt_toolkit.widgets.toolbars import FormattedTextToolbar
from prompt_toolkit.data_structures import Point
from prompt_toolkit.layout import (
Layout,
HSplit,
Window,
CompletionsMenu,
Float,
FormattedTextControl,
FloatContainer,
ConditionalContainer,
Dimension,
)
from bumble import __version__
import bumble.core
from bumble import colors
from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT
from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
from bumble.utils import AsyncRunner
from bumble.transport import open_transport_or_link
from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor
from bumble.gatt_client import CharacteristicProxy
from bumble.hci import (
HCI_Constant,
HCI_LE_1M_PHY,
HCI_LE_2M_PHY,
HCI_LE_CODED_PHY,
)
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
DEFAULT_RSSI_BAR_WIDTH = 20
DEFAULT_CONNECTION_TIMEOUT = 30.0
DISPLAY_MIN_RSSI = -100
DISPLAY_MAX_RSSI = -30
RSSI_MONITOR_INTERVAL = 5.0 # Seconds
# -----------------------------------------------------------------------------
# Utils
# -----------------------------------------------------------------------------
def le_phy_name(phy_id):
return {HCI_LE_1M_PHY: '1M', HCI_LE_2M_PHY: '2M', HCI_LE_CODED_PHY: 'CODED'}.get(
phy_id, HCI_Constant.le_phy_name(phy_id)
)
def rssi_bar(rssi):
blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉']
bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
bar_width = min(max(bar_width, 0), 1)
bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
bar_blocks = ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
return f'{rssi:4} {bar_blocks}'
def parse_phys(phys):
if phys.lower() == '*':
return None
phy_list = []
elements = phys.lower().split(',')
for element in elements:
if element == '1m':
phy_list.append(HCI_LE_1M_PHY)
elif element == '2m':
phy_list.append(HCI_LE_2M_PHY)
elif element == 'coded':
phy_list.append(HCI_LE_CODED_PHY)
else:
raise ValueError('invalid PHY name')
return phy_list
# -----------------------------------------------------------------------------
# Console App
# -----------------------------------------------------------------------------
class ConsoleApp:
connected_peer: Optional[Peer]
def __init__(self):
self.known_addresses = set()
self.known_remote_attributes = []
self.known_local_attributes = []
self.device = None
self.connected_peer = None
self.top_tab = 'device'
self.monitor_rssi = False
self.connection_rssi = None
style = Style.from_dict(
{
'output-field': 'bg:#000044 #ffffff',
'input-field': 'bg:#000000 #ffffff',
'line': '#004400',
'error': 'fg:ansired',
}
)
class LiveCompleter(Completer):
def __init__(self, words):
self.words = words
def get_completions(self, document, complete_event):
prefix = document.text_before_cursor.upper()
for word in [x for x in self.words if x.upper().startswith(prefix)]:
yield Completion(word, start_position=-len(prefix))
def make_completer():
return NestedCompleter.from_nested_dict(
{
'scan': {'on': None, 'off': None, 'clear': None},
'advertise': {'on': None, 'off': None},
'rssi': {'on': None, 'off': None},
'show': {
'scan': None,
'log': None,
'device': None,
'local-services': None,
'remote-services': None,
'local-values': None,
'remote-values': None,
},
'filter': {
'address': None,
},
'connect': LiveCompleter(self.known_addresses),
'update-parameters': None,
'encrypt': None,
'disconnect': None,
'discover': {'services': None, 'attributes': None},
'request-mtu': None,
'read': LiveCompleter(self.known_remote_attributes),
'write': LiveCompleter(self.known_remote_attributes),
'local-write': LiveCompleter(self.known_local_attributes),
'subscribe': LiveCompleter(self.known_remote_attributes),
'unsubscribe': LiveCompleter(self.known_remote_attributes),
'set-phy': {'1m': None, '2m': None, 'coded': None},
'set-default-phy': None,
'quit': None,
'exit': None,
}
)
self.input_field = TextArea(
height=1,
prompt="> ",
multiline=False,
wrap_lines=False,
completer=make_completer(),
history=FileHistory(os.path.join(BUMBLE_USER_DIR, 'history')),
)
self.input_field.accept_handler = self.accept_input
self.output_height = Dimension(min=7, max=7, weight=1)
self.output_lines = []
self.output = FormattedTextControl(
get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1))
)
self.output_max_lines = 20
self.scan_results_text = FormattedTextControl()
self.local_services_text = FormattedTextControl()
self.remote_services_text = FormattedTextControl()
self.device_text = FormattedTextControl()
self.log_text = FormattedTextControl(
get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1))
)
self.local_values_text = FormattedTextControl()
self.remote_values_text = FormattedTextControl()
self.log_height = Dimension(min=7, weight=4)
self.log_max_lines = 100
self.log_lines = []
container = HSplit(
[
ConditionalContainer(
Frame(Window(self.scan_results_text), title='Scan Results'),
filter=Condition(lambda: self.top_tab == 'scan'),
),
ConditionalContainer(
Frame(Window(self.local_services_text), title='Local Services'),
filter=Condition(lambda: self.top_tab == 'local-services'),
),
ConditionalContainer(
Frame(Window(self.local_values_text), title='Local Values'),
filter=Condition(lambda: self.top_tab == 'local-values'),
),
ConditionalContainer(
Frame(Window(self.remote_services_text), title='Remote Services'),
filter=Condition(lambda: self.top_tab == 'remote-services'),
),
ConditionalContainer(
Frame(Window(self.remote_values_text), title='Remote Values'),
filter=Condition(lambda: self.top_tab == 'remote-values'),
),
ConditionalContainer(
Frame(Window(self.log_text, height=self.log_height), title='Log'),
filter=Condition(lambda: self.top_tab == 'log'),
),
ConditionalContainer(
Frame(Window(self.device_text), title='Device'),
filter=Condition(lambda: self.top_tab == 'device'),
),
Frame(Window(self.output, height=self.output_height)),
FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'),
self.input_field,
]
)
container = FloatContainer(
container,
floats=[
Float(
xcursor=True,
ycursor=True,
content=CompletionsMenu(max_height=16, scroll_offset=1),
),
],
)
layout = Layout(container, focused_element=self.input_field)
key_bindings = KeyBindings()
@key_bindings.add("c-c")
@key_bindings.add("c-q")
def _(event):
event.app.exit()
# pylint: disable=invalid-name
self.ui = Application(
layout=layout, style=style, key_bindings=key_bindings, full_screen=True
)
async def run_async(self, device_config, transport):
rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop())
async with await open_transport_or_link(transport) as (hci_source, hci_sink):
if device_config:
self.device = Device.from_config_file_with_hci(
device_config, hci_source, hci_sink
)
else:
random_address = (
f"{random.randint(192,255):02X}" # address is static random
)
for random_byte in random.sample(range(255), 5):
random_address += f":{random_byte:02X}"
self.append_to_log(f"Setting random address: {random_address}")
self.device = Device.with_hci(
'Bumble', random_address, hci_source, hci_sink
)
self.device.listener = DeviceListener(self)
await self.device.power_on()
self.show_device(self.device)
self.show_local_services(self.device.gatt_server.attributes)
# Run the UI
await self.ui.run_async()
rssi_monitoring_task.cancel()
def add_known_address(self, address):
self.known_addresses.add(address)
def accept_input(self, _):
if len(self.input_field.text) == 0:
return
self.append_to_output([('', '* '), ('ansicyan', self.input_field.text)], False)
self.ui.create_background_task(self.command(self.input_field.text))
def get_status_bar_text(self):
scanning = "ON" if self.device and self.device.is_scanning else "OFF"
connection_state = 'NONE'
encryption_state = ''
att_mtu = ''
rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi)
if self.device:
if self.device.is_le_connecting:
connection_state = 'CONNECTING'
elif self.connected_peer:
connection = self.connected_peer.connection
connection_parameters = (
f'{connection.parameters.connection_interval}/'
f'{connection.parameters.peripheral_latency}/'
f'{connection.parameters.supervision_timeout}'
)
if connection.transport == BT_LE_TRANSPORT:
phy_state = (
f' RX={le_phy_name(connection.phy.rx_phy)}/'
f'TX={le_phy_name(connection.phy.tx_phy)}'
)
else:
phy_state = ''
connection_state = (
f'{connection.peer_address} '
f'{connection_parameters} '
f'{connection.data_length}'
f'{phy_state}'
)
encryption_state = (
'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED'
)
att_mtu = f'ATT_MTU: {connection.att_mtu}'
return [
('ansigreen', f' SCAN: {scanning} '),
('', ' '),
('ansiblue', f' CONNECTION: {connection_state} '),
('', ' '),
('ansimagenta', f' {encryption_state} '),
('', ' '),
('ansicyan', f' {att_mtu} '),
('', ' '),
('ansiyellow', f' {rssi} '),
]
def show_error(self, title, details=None):
appended = [('class:error', title)]
if details:
appended.append(('', f' {details}'))
self.append_to_output(appended)
def show_scan_results(self, scan_results):
max_lines = 40 # TEMP
lines = []
keys = list(scan_results.keys())[:max_lines]
for key in keys:
lines.append(scan_results[key].to_display_string())
self.scan_results_text.text = ANSI('\n'.join(lines))
self.ui.invalidate()
def show_remote_services(self, services):
lines = []
del self.known_remote_attributes[:]
for service in services:
lines.append(("ansicyan", f"{service}\n"))
for characteristic in service.characteristics:
lines.append(('ansimagenta', f' {characteristic} + \n'))
self.known_remote_attributes.append(
f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}'
)
self.known_remote_attributes.append(
f'*.{characteristic.uuid.to_hex_str()}'
)
self.known_remote_attributes.append(f'#{characteristic.handle:X}')
for descriptor in characteristic.descriptors:
lines.append(("ansigreen", f" {descriptor}\n"))
self.remote_services_text.text = lines
self.ui.invalidate()
def show_local_services(self, attributes):
lines = []
del self.known_local_attributes[:]
for attribute in attributes:
if isinstance(attribute, Service):
# Save the most recent service for use later
service = attribute
lines.append(("ansicyan", f"{attribute}\n"))
elif isinstance(attribute, Characteristic):
# CharacteristicDeclaration includes all info from Characteristic
# no need to print it twice
continue
elif isinstance(attribute, CharacteristicDeclaration):
# Save the most recent characteristic declaration for use later
characteristic_declaration = attribute
self.known_local_attributes.append(
f'{service.uuid.to_hex_str()}.{attribute.characteristic.uuid.to_hex_str()}'
)
self.known_local_attributes.append(
f'#{attribute.characteristic.handle:X}'
)
lines.append(("ansimagenta", f" {attribute}\n"))
elif isinstance(attribute, Descriptor):
self.known_local_attributes.append(
f'{service.uuid.to_hex_str()}.{characteristic_declaration.characteristic.uuid.to_hex_str()}.{attribute.type.to_hex_str()}'
)
self.known_local_attributes.append(f'#{attribute.handle:X}')
lines.append(("ansigreen", f" {attribute}\n"))
else:
lines.append(("ansiyellow", f"{attribute}\n"))
self.local_services_text.text = lines
self.ui.invalidate()
def show_device(self, device):
lines = []
lines.append(('ansicyan', 'Bumble Version: '))
lines.append(('', f'{__version__}\n'))
lines.append(('ansicyan', 'Name: '))
lines.append(('', f'{device.name}\n'))
lines.append(('ansicyan', 'Public Address: '))
lines.append(('', f'{device.public_address}\n'))
lines.append(('ansicyan', 'Random Address: '))
lines.append(('', f'{device.random_address}\n'))
lines.append(('ansicyan', 'LE Enabled: '))
lines.append(('', f'{device.le_enabled}\n'))
lines.append(('ansicyan', 'Classic Enabled: '))
lines.append(('', f'{device.classic_enabled}\n'))
lines.append(('ansicyan', 'Classic SC Enabled: '))
lines.append(('', f'{device.classic_sc_enabled}\n'))
lines.append(('ansicyan', 'Classic SSP Enabled: '))
lines.append(('', f'{device.classic_ssp_enabled}\n'))
lines.append(('ansicyan', 'Classic Class: '))
lines.append(('', f'{device.class_of_device}\n'))
lines.append(('ansicyan', 'Discoverable: '))
lines.append(('', f'{device.discoverable}\n'))
lines.append(('ansicyan', 'Connectable: '))
lines.append(('', f'{device.connectable}\n'))
lines.append(('ansicyan', 'Advertising Data: '))
lines.append(('', f'{device.advertising_data}\n'))
lines.append(('ansicyan', 'Scan Response Data: '))
lines.append(('', f'{device.scan_response_data}\n'))
advertising_interval = (
device.advertising_interval_min
if device.advertising_interval_min == device.advertising_interval_max
else (
f'{device.advertising_interval_min} to '
f'{device.advertising_interval_max}'
)
)
lines.append(('ansicyan', 'Advertising Interval: '))
lines.append(('', f'{advertising_interval}\n'))
self.device_text.text = lines
self.ui.invalidate()
def append_to_output(self, line, invalidate=True):
if isinstance(line, str):
line = [('', line)]
self.output_lines = self.output_lines[-self.output_max_lines :]
self.output_lines.append(line)
formatted_text = []
for line in self.output_lines:
formatted_text += line
formatted_text.append(('', '\n'))
self.output.text = formatted_text
if invalidate:
self.ui.invalidate()
def append_to_log(self, lines, invalidate=True):
self.log_lines.extend(lines.split('\n'))
self.log_lines = self.log_lines[-self.log_max_lines :]
self.log_text.text = ANSI('\n'.join(self.log_lines))
if invalidate:
self.ui.invalidate()
async def discover_services(self):
if not self.connected_peer:
self.show_error('not connected')
return
# Discover all services, characteristics and descriptors
self.append_to_output('discovering services...')
await self.connected_peer.discover_services()
self.append_to_output(
f'found {len(self.connected_peer.services)} services,'
' discovering characteristics...'
)
await self.connected_peer.discover_characteristics()
self.append_to_output('found characteristics, discovering descriptors...')
for service in self.connected_peer.services:
for characteristic in service.characteristics:
await self.connected_peer.discover_descriptors(characteristic)
self.append_to_output('discovery completed')
self.show_remote_services(self.connected_peer.services)
async def discover_attributes(self):
if not self.connected_peer:
self.show_error('not connected')
return
# Discover all attributes
self.append_to_output('discovering attributes...')
attributes = await self.connected_peer.discover_attributes()
self.append_to_output(f'discovered {len(attributes)} attributes...')
self.show_attributes(attributes)
def find_remote_characteristic(self, param) -> Optional[CharacteristicProxy]:
if not self.connected_peer:
return None
parts = param.split('.')
if len(parts) == 2:
service_uuid = UUID(parts[0]) if parts[0] != '*' else None
characteristic_uuid = UUID(parts[1])
for service in self.connected_peer.services:
if service_uuid is None or service.uuid == service_uuid:
for characteristic in service.characteristics:
if characteristic.uuid == characteristic_uuid:
return characteristic
elif len(parts) == 1:
if parts[0].startswith('#'):
attribute_handle = int(f'{parts[0][1:]}', 16)
for service in self.connected_peer.services:
for characteristic in service.characteristics:
if characteristic.handle == attribute_handle:
return characteristic
return None
def find_local_attribute(
self, param
) -> Optional[Union[Characteristic, Descriptor]]:
parts = param.split('.')
if len(parts) == 3:
service_uuid = UUID(parts[0])
characteristic_uuid = UUID(parts[1])
descriptor_uuid = UUID(parts[2])
return self.device.gatt_server.get_descriptor_attribute(
service_uuid, characteristic_uuid, descriptor_uuid
)
if len(parts) == 2:
service_uuid = UUID(parts[0])
characteristic_uuid = UUID(parts[1])
characteristic_attributes = (
self.device.gatt_server.get_characteristic_attributes(
service_uuid, characteristic_uuid
)
)
if characteristic_attributes:
return characteristic_attributes[1]
return None
elif len(parts) == 1:
if parts[0].startswith('#'):
attribute_handle = int(f'{parts[0][1:]}', 16)
attribute = self.device.gatt_server.get_attribute(attribute_handle)
if isinstance(attribute, (Characteristic, Descriptor)):
return attribute
return None
return None
async def rssi_monitor_loop(self):
while True:
if self.monitor_rssi and self.connected_peer:
self.connection_rssi = await self.connected_peer.connection.get_rssi()
await asyncio.sleep(RSSI_MONITOR_INTERVAL)
async def command(self, command):
try:
(keyword, *params) = command.strip().split(' ')
keyword = keyword.replace('-', '_').lower()
handler = getattr(self, f'do_{keyword}', None)
if handler:
await handler(params)
self.ui.invalidate()
else:
self.show_error('unknown command', keyword)
except Exception as error:
self.show_error(str(error))
async def do_scan(self, params):
if len(params) == 0:
# Toggle scanning
if self.device.is_scanning:
await self.device.stop_scanning()
else:
await self.device.start_scanning()
elif params[0] == 'on':
if len(params) == 2:
if not params[1].startswith("filter="):
self.show_error(
'invalid syntax',
'expected address filter=key1:value1,key2:value,... '
'available filters: address',
)
# regex: (word):(any char except ,)
matches = re.findall(r"(\w+):([^,]+)", params[1])
for match in matches:
if match[0] == "address":
self.device.listener.address_filter = match[1]
await self.device.start_scanning()
self.top_tab = 'scan'
elif params[0] == 'off':
await self.device.stop_scanning()
elif params[0] == 'clear':
self.device.listener.scan_results.clear()
self.known_addresses.clear()
self.show_scan_results(self.device.listener.scan_results)
else:
self.show_error('unsupported arguments for scan command')
async def do_rssi(self, params):
if len(params) == 0:
# Toggle monitoring
self.monitor_rssi = not self.monitor_rssi
elif params[0] == 'on':
self.monitor_rssi = True
elif params[0] == 'off':
self.monitor_rssi = False
else:
self.show_error('unsupported arguments for rssi command')
async def do_connect(self, params):
if len(params) != 1 and len(params) != 2:
self.show_error('invalid syntax', 'expected connect <address> [phys]')
return
if len(params) == 1:
phys = None
else:
phys = parse_phys(params[1])
if phys is None:
connection_parameters_preferences = None
else:
connection_parameters_preferences = {
phy: ConnectionParametersPreferences() for phy in phys
}
if self.device.is_scanning:
await self.device.stop_scanning()
self.append_to_output('connecting...')
try:
await self.device.connect(
params[0],
connection_parameters_preferences=connection_parameters_preferences,
timeout=DEFAULT_CONNECTION_TIMEOUT,
)
self.top_tab = 'services'
except bumble.core.TimeoutError:
self.show_error('connection timed out')
async def do_disconnect(self, _):
if self.device.is_le_connecting:
await self.device.cancel_connection()
else:
if not self.connected_peer:
self.show_error('not connected')
return
await self.connected_peer.connection.disconnect()
async def do_update_parameters(self, params):
if len(params) != 1 or len(params[0].split('/')) != 3:
self.show_error(
'invalid syntax',
'expected update-parameters <interval-min>-<interval-max>'
'/<max-latency>/<supervision>',
)
return
if not self.connected_peer:
self.show_error('not connected')
return
connection_intervals, max_latency, supervision_timeout = params[0].split('/')
connection_interval_min, connection_interval_max = [
int(x) for x in connection_intervals.split('-')
]
max_latency = int(max_latency)
supervision_timeout = int(supervision_timeout)
await self.connected_peer.connection.update_parameters(
connection_interval_min,
connection_interval_max,
max_latency,
supervision_timeout,
)
async def do_encrypt(self, _):
if not self.connected_peer:
self.show_error('not connected')
return
await self.connected_peer.connection.encrypt()
async def do_advertise(self, params):
if len(params) == 0:
# Toggle advertising
if self.device.is_advertising:
await self.device.stop_advertising()
else:
await self.device.start_advertising()
elif params[0] == 'on':
await self.device.start_advertising()
elif params[0] == 'off':
await self.device.stop_advertising()
else:
self.show_error('unsupported arguments for advertise command')
async def do_show(self, params):
if params:
if params[0] in {
'scan',
'log',
'device',
'local-services',
'remote-services',
'local-values',
'remote-values',
}:
self.top_tab = params[0]
self.ui.invalidate()
while self.top_tab == 'local-values':
await self.do_show_local_values()
await asyncio.sleep(1)
while self.top_tab == 'remote-values':
await self.do_show_remote_values()
await asyncio.sleep(1)
async def do_show_local_values(self):
prettytable = PrettyTable()
field_names = ["Service", "Characteristic", "Descriptor"]
# if there's no connections, add a column just for value
if not self.device.connections:
field_names.append("Value")
# if there are connections, add a column for each connection's value
for connection in self.device.connections.values():
field_names.append(f"Connection {connection.handle}")
for attribute in self.device.gatt_server.attributes:
if isinstance(attribute, Characteristic):
service = self.device.gatt_server.get_attribute_group(
attribute.handle, Service
)
if not service:
continue
values = [
attribute.read_value(connection)
for connection in self.device.connections.values()
]
if not values:
values = [attribute.read_value(None)]
prettytable.add_row([f"{service.uuid}", attribute.uuid, ""] + values)
elif isinstance(attribute, Descriptor):
service = self.device.gatt_server.get_attribute_group(
attribute.handle, Service
)
if not service:
continue
characteristic = self.device.gatt_server.get_attribute_group(
attribute.handle, Characteristic
)
if not characteristic:
continue
values = [
attribute.read_value(connection)
for connection in self.device.connections.values()
]
if not values:
values = [attribute.read_value(None)]
# TODO: future optimization: convert CCCD value to human readable string
prettytable.add_row(
[service.uuid, characteristic.uuid, attribute.type] + values
)
prettytable.field_names = field_names
self.local_values_text.text = prettytable.get_string()
self.ui.invalidate()
async def do_show_remote_values(self):
prettytable = PrettyTable(
field_names=[
"Connection",
"Service",
"Characteristic",
"Descriptor",
"Time",
"Value",
]
)
for connection in self.device.connections.values():
for handle, (time, value) in connection.gatt_client.cached_values.items():
row = [connection.handle]
attribute = connection.gatt_client.get_attributes(handle)
if not attribute:
continue
if len(attribute) == 3:
row.extend(
[attribute[0].uuid, attribute[1].uuid, attribute[2].type]
)
elif len(attribute) == 2:
row.extend([attribute[0].uuid, attribute[1].uuid, ""])
elif len(attribute) == 1:
row.extend([attribute[0].uuid, "", ""])
else:
continue
row.extend([humanize.naturaltime(time), value])
prettytable.add_row(row)
self.remote_values_text.text = prettytable.get_string()
self.ui.invalidate()
async def do_get_phy(self, _):
if not self.connected_peer:
self.show_error('not connected')
return
phy = await self.connected_peer.connection.get_phy()
self.append_to_output(
f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, '
f'TX={HCI_Constant.le_phy_name(phy[1])}'
)
async def do_request_mtu(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected request-mtu <mtu>')
return
if not self.connected_peer:
self.show_error('not connected')
return
await self.connected_peer.request_mtu(int(params[0]))
async def do_discover(self, params):
if not params:
self.show_error('invalid syntax', 'expected discover services|attributes')
return
discovery_type = params[0]
if discovery_type == 'services':
await self.discover_services()
elif discovery_type == 'attributes':
await self.discover_attributes()
async def do_read(self, params):
if len(params) != 1:
self.show_error('invalid syntax', 'expected read <attribute>')
return
if not self.connected_peer:
self.show_error('not connected')
return
characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
value = await characteristic.read_value()
self.append_to_output(f'VALUE: 0x{value.hex()}')
async def do_write(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
if len(params) != 2:
self.show_error('invalid syntax', 'expected write <attribute> <value>')
return
if params[1].upper().startswith("0X"):
value = bytes.fromhex(params[1][2:]) # parse as hex string
else:
try:
value = int(params[1]) # try as integer
except ValueError:
value = str.encode(params[1]) # must be a string
characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
# use write with response if supported
with_response = characteristic.properties & Characteristic.Properties.WRITE
await characteristic.write_value(value, with_response=with_response)
async def do_local_write(self, params):
if len(params) != 2:
self.show_error(
'invalid syntax', 'expected local-write <attribute> <value>'
)
return
if params[1].upper().startswith("0X"):
value = bytes.fromhex(params[1][2:]) # parse as hex string
else:
try:
value = int(params[1]).to_bytes(2, "little") # try as 2 byte integer
except ValueError:
value = str.encode(params[1]) # must be a string
attribute = self.find_local_attribute(params[0])
if not attribute:
self.show_error('invalid syntax', 'unable to find attribute')
return
# send data to any subscribers
if isinstance(attribute, Characteristic):
attribute.write_value(None, value)
if attribute.has_properties(Characteristic.NOTIFY):
await self.device.gatt_server.notify_subscribers(attribute)
if attribute.has_properties(Characteristic.INDICATE):
await self.device.gatt_server.indicate_subscribers(attribute)
async def do_subscribe(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
if len(params) != 1:
self.show_error('invalid syntax', 'expected subscribe <attribute>')
return
characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
await characteristic.subscribe(
lambda value: self.append_to_output(
f"{characteristic} VALUE: 0x{value.hex()}"
),
)
async def do_unsubscribe(self, params):
if not self.connected_peer:
self.show_error('not connected')
return
if len(params) != 1:
self.show_error('invalid syntax', 'expected subscribe <attribute>')
return
characteristic = self.find_remote_characteristic(params[0])
if characteristic is None:
self.show_error('no such characteristic')
return
await characteristic.unsubscribe()
async def do_set_phy(self, params):
if len(params) != 1:
self.show_error(
'invalid syntax', 'expected set-phy <tx_rx_phys>|<tx_phys>/<rx_phys>'
)
return
if not self.connected_peer:
self.show_error('not connected')
return
if '/' in params[0]:
tx_phys, rx_phys = params[0].split('/')
else:
tx_phys = params[0]
rx_phys = tx_phys
await self.connected_peer.connection.set_phy(
tx_phys=parse_phys(tx_phys), rx_phys=parse_phys(rx_phys)
)
async def do_set_default_phy(self, params):
if len(params) != 1:
self.show_error(
'invalid syntax',
'expected set-default-phy <tx_rx_phys>|<tx_phys>/<rx_phys>',
)
return
if '/' in params[0]:
tx_phys, rx_phys = params[0].split('/')
else:
tx_phys = params[0]
rx_phys = tx_phys
await self.device.set_default_phy(
tx_phys=parse_phys(tx_phys), rx_phys=parse_phys(rx_phys)
)
async def do_exit(self, _):
self.ui.exit()
async def do_quit(self, _):
self.ui.exit()
async def do_filter(self, params):
if params[0] == "address":
if len(params) != 2:
self.show_error('invalid syntax', 'expected filter address <pattern>')
return
self.device.listener.address_filter = params[1]
# -----------------------------------------------------------------------------
# Device and Connection Listener
# -----------------------------------------------------------------------------
class DeviceListener(Device.Listener, Connection.Listener):
def __init__(self, app):
self.app = app
self.scan_results = OrderedDict()
self.address_filter = None
@property
def address_filter(self):
return self._address_filter
@address_filter.setter
def address_filter(self, filter_addr):
if filter_addr is None:
self._address_filter = re.compile(r".*")
else:
self._address_filter = re.compile(filter_addr)
self.scan_results = OrderedDict(
filter(self.filter_address_match, self.scan_results)
)
self.app.show_scan_results(self.scan_results)
def filter_address_match(self, address):
"""
Returns true if an address matches the filter
"""
return bool(self.address_filter.match(address))
@AsyncRunner.run_in_task()
# pylint: disable=invalid-overridden-method
async def on_connection(self, connection):
self.app.connected_peer = Peer(connection)
self.app.connection_rssi = None
self.app.append_to_output(f'connected to {self.app.connected_peer}')
connection.listener = self
def on_disconnection(self, reason):
self.app.append_to_output(
f'disconnected from {self.app.connected_peer}, '
f'reason: {HCI_Constant.error_name(reason)}'
)
self.app.connected_peer = None
self.app.connection_rssi = None
def on_connection_parameters_update(self):
self.app.append_to_output(
f'connection parameters update: '
f'{self.app.connected_peer.connection.parameters}'
)
def on_connection_phy_update(self):
self.app.append_to_output(
f'connection phy update: {self.app.connected_peer.connection.phy}'
)
def on_connection_att_mtu_update(self):
self.app.append_to_output(
f'connection att mtu update: {self.app.connected_peer.connection.att_mtu}'
)
def on_connection_encryption_change(self):
encryption_state = (
'encrypted'
if self.app.connected_peer.connection.is_encrypted
else 'not encrypted'
)
self.app.append_to_output(
'connection encryption change: ' f'{encryption_state}'
)
def on_connection_data_length_change(self):
self.app.append_to_output(
'connection data length change: '
f'{self.app.connected_peer.connection.data_length}'
)
def on_advertisement(self, advertisement):
if not self.filter_address_match(str(advertisement.address)):
return
entry_key = f'{advertisement.address}/{advertisement.address.address_type}'
entry = self.scan_results.get(entry_key)
if entry:
entry.ad_data = advertisement.data
entry.rssi = advertisement.rssi
entry.connectable = advertisement.is_connectable
else:
self.app.add_known_address(str(advertisement.address))
self.scan_results[entry_key] = ScanResult(
advertisement.address,
advertisement.address.address_type,
advertisement.data,
advertisement.rssi,
advertisement.is_connectable,
)
self.app.show_scan_results(self.scan_results)
# -----------------------------------------------------------------------------
# Scanning
# -----------------------------------------------------------------------------
class ScanResult:
def __init__(self, address, address_type, ad_data, rssi, connectable):
self.address = address
self.address_type = address_type
self.ad_data = ad_data
self.rssi = rssi
self.connectable = connectable
def to_display_string(self):
address_type_string = ('P', 'R', 'PI', 'RI')[self.address_type]
address_color = colors.yellow if self.connectable else colors.red
if address_type_string.startswith('P'):
type_color = colors.green
else:
type_color = colors.cyan
name = self.ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True)
if name is None:
name = self.ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True)
if name:
# Convert to string
try:
name = name.decode()
except UnicodeDecodeError:
name = name.hex()
else:
name = ''
# Remove any '/P' qualifier suffix from the address string
address_str = self.address.to_string(with_type_qualifier=False)
# RSSI bar
bar_string = rssi_bar(self.rssi)
bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string))
return (
f'{address_color(address_str)} [{type_color(address_type_string)}] '
f'{bar_string} {bar_padding} {name}'
)
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
class LogHandler(logging.Handler):
def __init__(self, app):
super().__init__()
self.app = app
self.setFormatter(logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s'))
def emit(self, record):
message = self.format(record)
self.app.append_to_log(message)
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
@click.command()
@click.option('--device-config', help='Device configuration file')
@click.argument('transport')
def main(device_config, transport):
# Ensure that the BUMBLE_USER_DIR directory exists
if not os.path.isdir(BUMBLE_USER_DIR):
os.mkdir(BUMBLE_USER_DIR)
# Create an instance of the app
app = ConsoleApp()
# Setup logging
# logging.basicConfig(level = 'FATAL')
# logging.basicConfig(level = 'DEBUG')
root_logger = logging.getLogger()
root_logger.addHandler(LogHandler(app))
root_logger.setLevel(logging.DEBUG)
# Run until the user exits
asyncio.run(app.run_async(device_config, transport))
# -----------------------------------------------------------------------------
if __name__ == "__main__":
main() # pylint: disable=no-value-for-parameter