| # Copyright 2021-2022 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # ----------------------------------------------------------------------------- |
| # Imports |
| # ----------------------------------------------------------------------------- |
| import asyncio |
| import os |
| import logging |
| import click |
| import aioconsole |
| from colors import color |
| |
| from bumble.device import Device, Peer |
| from bumble.transport import open_transport_or_link |
| from bumble.smp import PairingDelegate, PairingConfig |
| from bumble.smp import error_name as smp_error_name |
| from bumble.keys import JsonKeyStore |
| from bumble.core import ProtocolError |
| from bumble.gatt import ( |
| GATT_DEVICE_NAME_CHARACTERISTIC, |
| GATT_GENERIC_ACCESS_SERVICE, |
| Service, |
| Characteristic, |
| CharacteristicValue |
| ) |
| from bumble.att import ( |
| ATT_Error, |
| ATT_INSUFFICIENT_AUTHENTICATION_ERROR, |
| ATT_INSUFFICIENT_ENCRYPTION_ERROR |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| class Delegate(PairingDelegate): |
| def __init__(self, mode, connection, capability_string, prompt): |
| super().__init__({ |
| 'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY, |
| 'display': PairingDelegate.DISPLAY_OUTPUT_ONLY, |
| 'display+keyboard': PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT, |
| 'display+yes/no': PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT, |
| 'none': PairingDelegate.NO_OUTPUT_NO_INPUT |
| }[capability_string.lower()]) |
| |
| self.mode = mode |
| self.peer = Peer(connection) |
| self.peer_name = None |
| self.prompt = prompt |
| |
| async def update_peer_name(self): |
| if self.peer_name is not None: |
| # We already asked the peer |
| return |
| |
| # Try to get the peer's name |
| if self.peer: |
| peer_name = await get_peer_name(self.peer, self.mode) |
| self.peer_name = f'{peer_name or ""} [{self.peer.connection.peer_address}]' |
| else: |
| self.peer_name = '[?]' |
| |
| async def accept(self): |
| if self.prompt: |
| await self.update_peer_name() |
| |
| # Wait a bit to allow some of the log lines to print before we prompt |
| await asyncio.sleep(1) |
| |
| # Prompt for acceptance |
| print(color('###-----------------------------------', 'yellow')) |
| print(color(f'### Pairing request from {self.peer_name}', 'yellow')) |
| print(color('###-----------------------------------', 'yellow')) |
| while True: |
| response = await aioconsole.ainput(color('>>> Accept? ', 'yellow')) |
| response = response.lower().strip() |
| if response == 'yes': |
| return True |
| elif response == 'no': |
| return False |
| else: |
| # Accept silently |
| return True |
| |
| async def compare_numbers(self, number, digits): |
| await self.update_peer_name() |
| |
| # Wait a bit to allow some of the log lines to print before we prompt |
| await asyncio.sleep(1) |
| |
| # Prompt for a numeric comparison |
| print(color('###-----------------------------------', 'yellow')) |
| print(color(f'### Pairing with {self.peer_name}', 'yellow')) |
| print(color('###-----------------------------------', 'yellow')) |
| while True: |
| response = await aioconsole.ainput(color(f'>>> Does the other device display {number:0{digits}}? ', 'yellow')) |
| response = response.lower().strip() |
| if response == 'yes': |
| return True |
| elif response == 'no': |
| return False |
| |
| async def get_number(self): |
| await self.update_peer_name() |
| |
| # Wait a bit to allow some of the log lines to print before we prompt |
| await asyncio.sleep(1) |
| |
| # Prompt for a PIN |
| while True: |
| try: |
| print(color('###-----------------------------------', 'yellow')) |
| print(color(f'### Pairing with {self.peer_name}', 'yellow')) |
| print(color('###-----------------------------------', 'yellow')) |
| return int(await aioconsole.ainput(color('>>> Enter PIN: ', 'yellow'))) |
| except ValueError: |
| pass |
| |
| async def display_number(self, number, digits): |
| await self.update_peer_name() |
| |
| # Wait a bit to allow some of the log lines to print before we prompt |
| await asyncio.sleep(1) |
| |
| # Display a PIN code |
| print(color('###-----------------------------------', 'yellow')) |
| print(color(f'### Pairing with {self.peer_name}', 'yellow')) |
| print(color(f'### PIN: {number:0{digits}}', 'yellow')) |
| print(color('###-----------------------------------', 'yellow')) |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def get_peer_name(peer, mode): |
| if mode == 'classic': |
| return await peer.request_name() |
| else: |
| # Try to get the peer name from GATT |
| services = await peer.discover_service(GATT_GENERIC_ACCESS_SERVICE) |
| if not services: |
| return None |
| |
| values = await peer.read_characteristics_by_uuid(GATT_DEVICE_NAME_CHARACTERISTIC, services[0]) |
| if values: |
| return values[0].decode('utf-8') |
| |
| |
| # ----------------------------------------------------------------------------- |
| AUTHENTICATION_ERROR_RETURNED = [False, False] |
| |
| |
| def read_with_error(connection): |
| if not connection.is_encrypted: |
| raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) |
| |
| if AUTHENTICATION_ERROR_RETURNED[0]: |
| return bytes([1]) |
| else: |
| AUTHENTICATION_ERROR_RETURNED[0] = True |
| raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR) |
| |
| |
| def write_with_error(connection, value): |
| if not connection.is_encrypted: |
| raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) |
| |
| if not AUTHENTICATION_ERROR_RETURNED[1]: |
| AUTHENTICATION_ERROR_RETURNED[1] = True |
| raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def on_connection(connection, request): |
| print(color(f'<<< Connection: {connection}', 'green')) |
| |
| # Listen for pairing events |
| connection.on('pairing_start', on_pairing_start) |
| connection.on('pairing', on_pairing) |
| connection.on('pairing_failure', on_pairing_failure) |
| |
| # Listen for encryption changes |
| connection.on( |
| 'connection_encryption_change', |
| lambda: on_connection_encryption_change(connection) |
| ) |
| |
| # Request pairing if needed |
| if request: |
| print(color('>>> Requesting pairing', 'green')) |
| connection.request_pairing() |
| |
| |
| # ----------------------------------------------------------------------------- |
| def on_connection_encryption_change(connection): |
| print(color('@@@-----------------------------------', 'blue')) |
| print(color(f'@@@ Connection is {"" if connection.is_encrypted else "not"}encrypted', 'blue')) |
| print(color('@@@-----------------------------------', 'blue')) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def on_pairing_start(): |
| print(color('***-----------------------------------', 'magenta')) |
| print(color('*** Pairing starting', 'magenta')) |
| print(color('***-----------------------------------', 'magenta')) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def on_pairing(keys): |
| print(color('***-----------------------------------', 'cyan')) |
| print(color('*** Paired!', 'cyan')) |
| keys.print(prefix=color('*** ', 'cyan')) |
| print(color('***-----------------------------------', 'cyan')) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def on_pairing_failure(reason): |
| print(color('***-----------------------------------', 'red')) |
| print(color(f'*** Pairing failed: {smp_error_name(reason)}', 'red')) |
| print(color('***-----------------------------------', 'red')) |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def pair( |
| mode, |
| sc, |
| mitm, |
| bond, |
| io, |
| prompt, |
| request, |
| print_keys, |
| keystore_file, |
| device_config, |
| hci_transport, |
| address_or_name |
| ): |
| print('<<< connecting to HCI...') |
| async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink): |
| print('<<< connected') |
| |
| # Create a device to manage the host |
| device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink) |
| |
| # Set a custom keystore if specified on the command line |
| if keystore_file: |
| device.keystore = JsonKeyStore(namespace=None, filename=keystore_file) |
| |
| # Print the existing keys before pairing |
| if print_keys and device.keystore: |
| print(color('@@@-----------------------------------', 'blue')) |
| print(color('@@@ Pairing Keys:', 'blue')) |
| await device.keystore.print(prefix=color('@@@ ', 'blue')) |
| print(color('@@@-----------------------------------', 'blue')) |
| |
| # Expose a GATT characteristic that can be used to trigger pairing by |
| # responding with an authentication error when read |
| if mode == 'le': |
| device.add_service( |
| Service( |
| '50DB505C-8AC4-4738-8448-3B1D9CC09CC5', |
| [ |
| Characteristic( |
| '552957FB-CF1F-4A31-9535-E78847E1A714', |
| Characteristic.READ | Characteristic.WRITE, |
| Characteristic.READABLE | Characteristic.WRITEABLE, |
| CharacteristicValue(read=read_with_error, write=write_with_error) |
| ) |
| ] |
| ) |
| ) |
| |
| # Select LE or Classic |
| if mode == 'classic': |
| device.classic_enabled = True |
| device.le_enabled = False |
| |
| # Get things going |
| await device.power_on() |
| |
| # Set up a pairing config factory |
| device.pairing_config_factory = lambda connection: PairingConfig( |
| sc, |
| mitm, |
| bond, |
| Delegate(mode, connection, io, prompt) |
| ) |
| |
| # Connect to a peer or wait for a connection |
| device.on('connection', lambda connection: on_connection(connection, request)) |
| if address_or_name is not None: |
| print(color(f'=== Connecting to {address_or_name}...', 'green')) |
| connection = await device.connect(address_or_name) |
| |
| if not request: |
| try: |
| if mode == 'le': |
| await connection.pair() |
| else: |
| await connection.authenticate() |
| return |
| except ProtocolError as error: |
| print(color(f'Pairing failed: {error}', 'red')) |
| return |
| else: |
| # Advertise so that peers can find us and connect |
| await device.start_advertising(auto_restart=True) |
| |
| await hci_source.wait_for_termination() |
| |
| |
| # ----------------------------------------------------------------------------- |
| @click.command() |
| @click.option('--mode', type=click.Choice(['le', 'classic']), default='le', show_default=True) |
| @click.option('--sc', type=bool, default=True, help='Use the Secure Connections protocol', show_default=True) |
| @click.option('--mitm', type=bool, default=True, help='Request MITM protection', show_default=True) |
| @click.option('--bond', type=bool, default=True, help='Enable bonding', show_default=True) |
| @click.option('--io', type=click.Choice(['keyboard', 'display', 'display+keyboard', 'display+yes/no', 'none']), default='display+keyboard', show_default=True) |
| @click.option('--prompt', is_flag=True, help='Prompt to accept/reject pairing request') |
| @click.option('--request', is_flag=True, help='Request that the connecting peer initiate pairing') |
| @click.option('--print-keys', is_flag=True, help='Print the bond keys before pairing') |
| @click.option('--keystore-file', help='File in which to store the pairing keys') |
| @click.argument('device-config') |
| @click.argument('hci_transport') |
| @click.argument('address-or-name', required=False) |
| def main(mode, sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, hci_transport, address_or_name): |
| logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) |
| asyncio.run(pair(mode, sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, hci_transport, address_or_name)) |
| |
| |
| # ----------------------------------------------------------------------------- |
| if __name__ == '__main__': |
| main() |