| # Copyright 2021-2022 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # ----------------------------------------------------------------------------- |
| # Imports |
| # ----------------------------------------------------------------------------- |
| import asyncio |
| import os |
| import logging |
| from typing import Callable, Iterable, Optional |
| |
| import click |
| |
| from bumble.core import ProtocolError |
| from bumble.colors import color |
| from bumble.device import Device, Peer |
| from bumble.gatt import Service |
| from bumble.profiles.device_information_service import DeviceInformationServiceProxy |
| from bumble.profiles.battery_service import BatteryServiceProxy |
| from bumble.profiles.gap import GenericAccessServiceProxy |
| from bumble.profiles.tmap import TelephonyAndMediaAudioServiceProxy |
| from bumble.transport import open_transport_or_link |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def try_show(function: Callable, *args, **kwargs) -> None: |
| try: |
| await function(*args, **kwargs) |
| except ProtocolError as error: |
| print(color('ERROR:', 'red'), error) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def show_services(services: Iterable[Service]) -> None: |
| for service in services: |
| print(color(str(service), 'cyan')) |
| |
| for characteristic in service.characteristics: |
| print(color(' ' + str(characteristic), 'magenta')) |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def show_gap_information( |
| gap_service: GenericAccessServiceProxy, |
| ): |
| print(color('### Generic Access Profile', 'yellow')) |
| |
| if gap_service.device_name: |
| print( |
| color(' Device Name:', 'green'), |
| await gap_service.device_name.read_value(), |
| ) |
| |
| if gap_service.appearance: |
| print( |
| color(' Appearance: ', 'green'), |
| await gap_service.appearance.read_value(), |
| ) |
| |
| print() |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def show_device_information( |
| device_information_service: DeviceInformationServiceProxy, |
| ): |
| print(color('### Device Information', 'yellow')) |
| |
| if device_information_service.manufacturer_name: |
| print( |
| color(' Manufacturer Name:', 'green'), |
| await device_information_service.manufacturer_name.read_value(), |
| ) |
| |
| if device_information_service.model_number: |
| print( |
| color(' Model Number: ', 'green'), |
| await device_information_service.model_number.read_value(), |
| ) |
| |
| if device_information_service.serial_number: |
| print( |
| color(' Serial Number: ', 'green'), |
| await device_information_service.serial_number.read_value(), |
| ) |
| |
| if device_information_service.firmware_revision: |
| print( |
| color(' Firmware Revision:', 'green'), |
| await device_information_service.firmware_revision.read_value(), |
| ) |
| |
| print() |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def show_battery_level( |
| battery_service: BatteryServiceProxy, |
| ): |
| print(color('### Battery Information', 'yellow')) |
| |
| if battery_service.battery_level: |
| print( |
| color(' Battery Level:', 'green'), |
| await battery_service.battery_level.read_value(), |
| ) |
| |
| print() |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def show_tmas( |
| tmas: TelephonyAndMediaAudioServiceProxy, |
| ): |
| print(color('### Telephony And Media Audio Service', 'yellow')) |
| |
| if tmas.role: |
| print( |
| color(' Role:', 'green'), |
| await tmas.role.read_value(), |
| ) |
| |
| print() |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def show_device_info(peer, done: Optional[asyncio.Future]) -> None: |
| try: |
| # Discover all services |
| print(color('### Discovering Services and Characteristics', 'magenta')) |
| await peer.discover_services() |
| for service in peer.services: |
| await service.discover_characteristics() |
| |
| print(color('=== Services ===', 'yellow')) |
| show_services(peer.services) |
| print() |
| |
| if gap_service := peer.create_service_proxy(GenericAccessServiceProxy): |
| await try_show(show_gap_information, gap_service) |
| |
| if device_information_service := peer.create_service_proxy( |
| DeviceInformationServiceProxy |
| ): |
| await try_show(show_device_information, device_information_service) |
| |
| if battery_service := peer.create_service_proxy(BatteryServiceProxy): |
| await try_show(show_battery_level, battery_service) |
| |
| if tmas := peer.create_service_proxy(TelephonyAndMediaAudioServiceProxy): |
| await try_show(show_tmas, tmas) |
| |
| if done is not None: |
| done.set_result(None) |
| except asyncio.CancelledError: |
| print(color('!!! Operation canceled', 'red')) |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def async_main(device_config, encrypt, transport, address_or_name): |
| async with await open_transport_or_link(transport) as (hci_source, hci_sink): |
| |
| # Create a device |
| if device_config: |
| device = Device.from_config_file_with_hci( |
| device_config, hci_source, hci_sink |
| ) |
| else: |
| device = Device.with_hci( |
| 'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink |
| ) |
| await device.power_on() |
| |
| if address_or_name: |
| # Connect to the target peer |
| print(color('>>> Connecting...', 'green')) |
| connection = await device.connect(address_or_name) |
| print(color('>>> Connected', 'green')) |
| |
| # Encrypt the connection if required |
| if encrypt: |
| print(color('+++ Encrypting connection...', 'blue')) |
| await connection.encrypt() |
| print(color('+++ Encryption established', 'blue')) |
| |
| await show_device_info(Peer(connection), None) |
| else: |
| # Wait for a connection |
| done = asyncio.get_running_loop().create_future() |
| device.on( |
| 'connection', |
| lambda connection: asyncio.create_task( |
| show_device_info(Peer(connection), done) |
| ), |
| ) |
| await device.start_advertising(auto_restart=True) |
| |
| print(color('### Waiting for connection...', 'blue')) |
| await done |
| |
| |
| # ----------------------------------------------------------------------------- |
| @click.command() |
| @click.option('--device-config', help='Device configuration', type=click.Path()) |
| @click.option('--encrypt', help='Encrypt the connection', is_flag=True, default=False) |
| @click.argument('transport') |
| @click.argument('address-or-name', required=False) |
| def main(device_config, encrypt, transport, address_or_name): |
| """ |
| Dump the GATT database on a remote device. If ADDRESS_OR_NAME is not specified, |
| wait for an incoming connection. |
| """ |
| logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) |
| asyncio.run(async_main(device_config, encrypt, transport, address_or_name)) |
| |
| |
| # ----------------------------------------------------------------------------- |
| if __name__ == '__main__': |
| main() |