| # Copyright 2021-2022 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # ----------------------------------------------------------------------------- |
| # Imports |
| # ----------------------------------------------------------------------------- |
| import pyee |
| |
| from bumble.device import Device |
| from bumble.hci import HCI_Reset_Command |
| |
| |
| # ----------------------------------------------------------------------------- |
| class Scanner(pyee.EventEmitter): |
| """ |
| Scanner web app |
| |
| Emitted events: |
| update: Emit when new `ScanEntry` are available. |
| """ |
| |
| class ScanEntry: |
| def __init__(self, advertisement): |
| self.address = advertisement.address.to_string(False) |
| self.address_type = ( |
| 'Public', |
| 'Random', |
| 'Public Identity', |
| 'Random Identity', |
| )[advertisement.address.address_type] |
| self.rssi = advertisement.rssi |
| self.data = advertisement.data.to_string('\n') |
| |
| def __init__(self, hci_source, hci_sink): |
| super().__init__() |
| self.device = Device.with_hci( |
| 'Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink |
| ) |
| self.scan_entries = {} |
| self.device.on('advertisement', self.on_advertisement) |
| |
| async def start(self): |
| print('### Starting Scanner') |
| self.scan_entries = {} |
| self.emit('update', self.scan_entries) |
| await self.device.power_on() |
| await self.device.start_scanning() |
| print('### Scanner started') |
| |
| async def stop(self): |
| # TODO: replace this once a proper reset is implemented in the lib. |
| await self.device.host.send_command(HCI_Reset_Command()) |
| await self.device.power_off() |
| print('### Scanner stopped') |
| |
| def on_advertisement(self, advertisement): |
| self.scan_entries[advertisement.address] = self.ScanEntry(advertisement) |
| self.emit('update', self.scan_entries) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def main(hci_source, hci_sink): |
| return Scanner(hci_source, hci_sink) |