| # Copyright 2021-2023 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # ----------------------------------------------------------------------------- |
| # Imports |
| # ----------------------------------------------------------------------------- |
| from __future__ import annotations |
| import asyncio |
| import asyncio.subprocess |
| from importlib import resources |
| import enum |
| import json |
| import os |
| import logging |
| import pathlib |
| import subprocess |
| from typing import Dict, List, Optional |
| import weakref |
| |
| import click |
| import aiohttp |
| from aiohttp import web |
| |
| import bumble |
| from bumble.colors import color |
| from bumble.core import BT_BR_EDR_TRANSPORT, CommandTimeoutError |
| from bumble.device import Connection, Device, DeviceConfiguration |
| from bumble.hci import HCI_StatusError |
| from bumble.pairing import PairingConfig |
| from bumble.sdp import ServiceAttribute |
| from bumble.transport import open_transport |
| from bumble.avdtp import ( |
| AVDTP_AUDIO_MEDIA_TYPE, |
| Listener, |
| MediaCodecCapabilities, |
| MediaPacket, |
| Protocol, |
| ) |
| from bumble.a2dp import ( |
| MPEG_2_AAC_LC_OBJECT_TYPE, |
| make_audio_sink_service_sdp_records, |
| A2DP_SBC_CODEC_TYPE, |
| A2DP_MPEG_2_4_AAC_CODEC_TYPE, |
| SBC_MONO_CHANNEL_MODE, |
| SBC_DUAL_CHANNEL_MODE, |
| SBC_SNR_ALLOCATION_METHOD, |
| SBC_LOUDNESS_ALLOCATION_METHOD, |
| SBC_STEREO_CHANNEL_MODE, |
| SBC_JOINT_STEREO_CHANNEL_MODE, |
| SbcMediaCodecInformation, |
| AacMediaCodecInformation, |
| ) |
| from bumble.utils import AsyncRunner |
| from bumble.codecs import AacAudioRtpPacket |
| |
| |
| # ----------------------------------------------------------------------------- |
| # Logging |
| # ----------------------------------------------------------------------------- |
| logger = logging.getLogger(__name__) |
| |
| |
| # ----------------------------------------------------------------------------- |
| # Constants |
| # ----------------------------------------------------------------------------- |
| DEFAULT_UI_PORT = 7654 |
| |
| # ----------------------------------------------------------------------------- |
| class AudioExtractor: |
| @staticmethod |
| def create(codec: str): |
| if codec == 'aac': |
| return AacAudioExtractor() |
| if codec == 'sbc': |
| return SbcAudioExtractor() |
| |
| def extract_audio(self, packet: MediaPacket) -> bytes: |
| raise NotImplementedError() |
| |
| |
| # ----------------------------------------------------------------------------- |
| class AacAudioExtractor: |
| def extract_audio(self, packet: MediaPacket) -> bytes: |
| return AacAudioRtpPacket(packet.payload).to_adts() |
| |
| |
| # ----------------------------------------------------------------------------- |
| class SbcAudioExtractor: |
| def extract_audio(self, packet: MediaPacket) -> bytes: |
| # header = packet.payload[0] |
| # fragmented = header >> 7 |
| # start = (header >> 6) & 0x01 |
| # last = (header >> 5) & 0x01 |
| # number_of_frames = header & 0x0F |
| |
| # TODO: support fragmented payloads |
| return packet.payload[1:] |
| |
| |
| # ----------------------------------------------------------------------------- |
| class Output: |
| async def start(self) -> None: |
| pass |
| |
| async def stop(self) -> None: |
| pass |
| |
| async def suspend(self) -> None: |
| pass |
| |
| async def on_connection(self, connection: Connection) -> None: |
| pass |
| |
| async def on_disconnection(self, reason: int) -> None: |
| pass |
| |
| def on_rtp_packet(self, packet: MediaPacket) -> None: |
| pass |
| |
| |
| # ----------------------------------------------------------------------------- |
| class FileOutput(Output): |
| filename: str |
| codec: str |
| extractor: AudioExtractor |
| |
| def __init__(self, filename, codec): |
| self.filename = filename |
| self.codec = codec |
| self.file = open(filename, 'wb') |
| self.extractor = AudioExtractor.create(codec) |
| |
| def on_rtp_packet(self, packet: MediaPacket) -> None: |
| self.file.write(self.extractor.extract_audio(packet)) |
| |
| |
| # ----------------------------------------------------------------------------- |
| class QueuedOutput(Output): |
| MAX_QUEUE_SIZE = 32768 |
| |
| packets: asyncio.Queue |
| extractor: AudioExtractor |
| packet_pump_task: Optional[asyncio.Task] |
| started: bool |
| |
| def __init__(self, extractor): |
| self.extractor = extractor |
| self.packets = asyncio.Queue() |
| self.packet_pump_task = None |
| self.started = False |
| |
| async def start(self): |
| if self.started: |
| return |
| |
| self.packet_pump_task = asyncio.create_task(self.pump_packets()) |
| |
| async def pump_packets(self): |
| while True: |
| packet = await self.packets.get() |
| await self.on_audio_packet(packet) |
| |
| async def on_audio_packet(self, packet: bytes) -> None: |
| pass |
| |
| def on_rtp_packet(self, packet: MediaPacket) -> None: |
| if self.packets.qsize() > self.MAX_QUEUE_SIZE: |
| logger.debug("queue full, dropping") |
| return |
| |
| self.packets.put_nowait(self.extractor.extract_audio(packet)) |
| |
| |
| # ----------------------------------------------------------------------------- |
| class WebSocketOutput(QueuedOutput): |
| def __init__(self, codec, send_audio, send_message): |
| super().__init__(AudioExtractor.create(codec)) |
| self.send_audio = send_audio |
| self.send_message = send_message |
| |
| async def on_connection(self, connection: Connection) -> None: |
| try: |
| await connection.request_remote_name() |
| except HCI_StatusError: |
| pass |
| peer_name = '' if connection.peer_name is None else connection.peer_name |
| peer_address = connection.peer_address.to_string(False) |
| await self.send_message( |
| 'connection', |
| peer_address=peer_address, |
| peer_name=peer_name, |
| ) |
| |
| async def on_disconnection(self, reason) -> None: |
| await self.send_message('disconnection') |
| |
| async def on_audio_packet(self, packet: bytes) -> None: |
| await self.send_audio(packet) |
| |
| async def start(self): |
| await super().start() |
| await self.send_message('start') |
| |
| async def stop(self): |
| await super().stop() |
| await self.send_message('stop') |
| |
| async def suspend(self): |
| await super().suspend() |
| await self.send_message('suspend') |
| |
| |
| # ----------------------------------------------------------------------------- |
| class FfplayOutput(QueuedOutput): |
| MAX_QUEUE_SIZE = 32768 |
| |
| subprocess: Optional[asyncio.subprocess.Process] |
| ffplay_task: Optional[asyncio.Task] |
| |
| def __init__(self, codec: str) -> None: |
| super().__init__(AudioExtractor.create(codec)) |
| self.subprocess = None |
| self.ffplay_task = None |
| self.codec = codec |
| |
| async def start(self): |
| if self.started: |
| return |
| |
| await super().start() |
| |
| self.subprocess = await asyncio.create_subprocess_shell( |
| f'ffplay -f {self.codec} pipe:0', |
| stdin=asyncio.subprocess.PIPE, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE, |
| ) |
| |
| self.ffplay_task = asyncio.create_task(self.monitor_ffplay()) |
| |
| async def stop(self): |
| # TODO |
| pass |
| |
| async def suspend(self): |
| # TODO |
| pass |
| |
| async def monitor_ffplay(self): |
| async def read_stream(name, stream): |
| while True: |
| data = await stream.read() |
| logger.debug(f'{name}:', data) |
| |
| await asyncio.wait( |
| [ |
| asyncio.create_task( |
| read_stream('[ffplay stdout]', self.subprocess.stdout) |
| ), |
| asyncio.create_task( |
| read_stream('[ffplay stderr]', self.subprocess.stderr) |
| ), |
| asyncio.create_task(self.subprocess.wait()), |
| ] |
| ) |
| logger.debug("FFPLAY done") |
| |
| async def on_audio_packet(self, packet): |
| try: |
| self.subprocess.stdin.write(packet) |
| except Exception: |
| logger.warning('!!!! exception while sending audio to ffplay pipe') |
| |
| |
| # ----------------------------------------------------------------------------- |
| class UiServer: |
| speaker: weakref.ReferenceType[Speaker] |
| port: int |
| |
| def __init__(self, speaker: Speaker, port: int) -> None: |
| self.speaker = weakref.ref(speaker) |
| self.port = port |
| self.channel_socket = None |
| |
| async def start_http(self) -> None: |
| """Start the UI HTTP server.""" |
| |
| app = web.Application() |
| app.add_routes( |
| [ |
| web.get('/', self.get_static), |
| web.get('/speaker.html', self.get_static), |
| web.get('/speaker.js', self.get_static), |
| web.get('/speaker.css', self.get_static), |
| web.get('/logo.svg', self.get_static), |
| web.get('/channel', self.get_channel), |
| ] |
| ) |
| |
| runner = web.AppRunner(app) |
| await runner.setup() |
| site = web.TCPSite(runner, 'localhost', self.port) |
| print('UI HTTP server at ' + color(f'http://127.0.0.1:{self.port}', 'green')) |
| await site.start() |
| |
| async def get_static(self, request): |
| path = request.path |
| if path == '/': |
| path = '/speaker.html' |
| if path.endswith('.html'): |
| content_type = 'text/html' |
| elif path.endswith('.js'): |
| content_type = 'text/javascript' |
| elif path.endswith('.css'): |
| content_type = 'text/css' |
| elif path.endswith('.svg'): |
| content_type = 'image/svg+xml' |
| else: |
| content_type = 'text/plain' |
| text = ( |
| resources.files("bumble.apps.speaker") |
| .joinpath(pathlib.Path(path).relative_to('/')) |
| .read_text(encoding="utf-8") |
| ) |
| return aiohttp.web.Response(text=text, content_type=content_type) |
| |
| async def get_channel(self, request): |
| ws = web.WebSocketResponse() |
| await ws.prepare(request) |
| |
| # Process messages until the socket is closed. |
| self.channel_socket = ws |
| async for message in ws: |
| if message.type == aiohttp.WSMsgType.TEXT: |
| logger.debug(f'<<< received message: {message.data}') |
| await self.on_message(message.data) |
| elif message.type == aiohttp.WSMsgType.ERROR: |
| logger.debug( |
| f'channel connection closed with exception {ws.exception()}' |
| ) |
| |
| self.channel_socket = None |
| logger.debug('--- channel connection closed') |
| |
| return ws |
| |
| async def on_message(self, message_str: str): |
| # Parse the message as JSON |
| message = json.loads(message_str) |
| |
| # Dispatch the message |
| message_type = message['type'] |
| message_params = message.get('params', {}) |
| handler = getattr(self, f'on_{message_type}_message') |
| if handler: |
| await handler(**message_params) |
| |
| async def on_hello_message(self): |
| await self.send_message( |
| 'hello', |
| bumble_version=bumble.__version__, |
| codec=self.speaker().codec, |
| streamState=self.speaker().stream_state.name, |
| ) |
| if connection := self.speaker().connection: |
| await self.send_message( |
| 'connection', |
| peer_address=connection.peer_address.to_string(False), |
| peer_name=connection.peer_name, |
| ) |
| |
| async def send_message(self, message_type: str, **kwargs) -> None: |
| if self.channel_socket is None: |
| return |
| |
| message = {'type': message_type, 'params': kwargs} |
| await self.channel_socket.send_json(message) |
| |
| async def send_audio(self, data: bytes) -> None: |
| if self.channel_socket is None: |
| return |
| |
| try: |
| await self.channel_socket.send_bytes(data) |
| except Exception as error: |
| logger.warning(f'exception while sending audio packet: {error}') |
| |
| |
| # ----------------------------------------------------------------------------- |
| class Speaker: |
| class StreamState(enum.Enum): |
| IDLE = 0 |
| STOPPED = 1 |
| STARTED = 2 |
| SUSPENDED = 3 |
| |
| def __init__(self, device_config, transport, codec, discover, outputs, ui_port): |
| self.device_config = device_config |
| self.transport = transport |
| self.codec = codec |
| self.discover = discover |
| self.ui_port = ui_port |
| self.device = None |
| self.connection = None |
| self.listener = None |
| self.packets_received = 0 |
| self.bytes_received = 0 |
| self.stream_state = Speaker.StreamState.IDLE |
| self.outputs = [] |
| for output in outputs: |
| if output == '@ffplay': |
| self.outputs.append(FfplayOutput(codec)) |
| continue |
| |
| # Default to FileOutput |
| self.outputs.append(FileOutput(output, codec)) |
| |
| # Create an HTTP server for the UI |
| self.ui_server = UiServer(speaker=self, port=ui_port) |
| |
| def sdp_records(self) -> Dict[int, List[ServiceAttribute]]: |
| service_record_handle = 0x00010001 |
| return { |
| service_record_handle: make_audio_sink_service_sdp_records( |
| service_record_handle |
| ) |
| } |
| |
| def codec_capabilities(self) -> MediaCodecCapabilities: |
| if self.codec == 'aac': |
| return self.aac_codec_capabilities() |
| |
| if self.codec == 'sbc': |
| return self.sbc_codec_capabilities() |
| |
| raise RuntimeError('unsupported codec') |
| |
| def aac_codec_capabilities(self) -> MediaCodecCapabilities: |
| return MediaCodecCapabilities( |
| media_type=AVDTP_AUDIO_MEDIA_TYPE, |
| media_codec_type=A2DP_MPEG_2_4_AAC_CODEC_TYPE, |
| media_codec_information=AacMediaCodecInformation.from_lists( |
| object_types=[MPEG_2_AAC_LC_OBJECT_TYPE], |
| sampling_frequencies=[48000, 44100], |
| channels=[1, 2], |
| vbr=1, |
| bitrate=256000, |
| ), |
| ) |
| |
| def sbc_codec_capabilities(self) -> MediaCodecCapabilities: |
| return MediaCodecCapabilities( |
| media_type=AVDTP_AUDIO_MEDIA_TYPE, |
| media_codec_type=A2DP_SBC_CODEC_TYPE, |
| media_codec_information=SbcMediaCodecInformation.from_lists( |
| sampling_frequencies=[48000, 44100, 32000, 16000], |
| channel_modes=[ |
| SBC_MONO_CHANNEL_MODE, |
| SBC_DUAL_CHANNEL_MODE, |
| SBC_STEREO_CHANNEL_MODE, |
| SBC_JOINT_STEREO_CHANNEL_MODE, |
| ], |
| block_lengths=[4, 8, 12, 16], |
| subbands=[4, 8], |
| allocation_methods=[ |
| SBC_LOUDNESS_ALLOCATION_METHOD, |
| SBC_SNR_ALLOCATION_METHOD, |
| ], |
| minimum_bitpool_value=2, |
| maximum_bitpool_value=53, |
| ), |
| ) |
| |
| async def dispatch_to_outputs(self, function): |
| for output in self.outputs: |
| await function(output) |
| |
| def on_bluetooth_connection(self, connection): |
| print(f'Connection: {connection}') |
| self.connection = connection |
| connection.on('disconnection', self.on_bluetooth_disconnection) |
| AsyncRunner.spawn( |
| self.dispatch_to_outputs(lambda output: output.on_connection(connection)) |
| ) |
| |
| def on_bluetooth_disconnection(self, reason): |
| print(f'Disconnection ({reason})') |
| self.connection = None |
| AsyncRunner.spawn(self.advertise()) |
| AsyncRunner.spawn( |
| self.dispatch_to_outputs(lambda output: output.on_disconnection(reason)) |
| ) |
| |
| def on_avdtp_connection(self, protocol): |
| print('Audio Stream Open') |
| |
| # Add a sink endpoint to the server |
| sink = protocol.add_sink(self.codec_capabilities()) |
| sink.on('start', self.on_sink_start) |
| sink.on('stop', self.on_sink_stop) |
| sink.on('suspend', self.on_sink_suspend) |
| sink.on('configuration', lambda: self.on_sink_configuration(sink.configuration)) |
| sink.on('rtp_packet', self.on_rtp_packet) |
| sink.on('rtp_channel_open', self.on_rtp_channel_open) |
| sink.on('rtp_channel_close', self.on_rtp_channel_close) |
| |
| # Listen for close events |
| protocol.on('close', self.on_avdtp_close) |
| |
| # Discover all endpoints on the remote device is requested |
| if self.discover: |
| AsyncRunner.spawn(self.discover_remote_endpoints(protocol)) |
| |
| def on_avdtp_close(self): |
| print("Audio Stream Closed") |
| |
| def on_sink_start(self): |
| print("Sink Started\u001b[0K") |
| self.stream_state = self.StreamState.STARTED |
| AsyncRunner.spawn(self.dispatch_to_outputs(lambda output: output.start())) |
| |
| def on_sink_stop(self): |
| print("Sink Stopped\u001b[0K") |
| self.stream_state = self.StreamState.STOPPED |
| AsyncRunner.spawn(self.dispatch_to_outputs(lambda output: output.stop())) |
| |
| def on_sink_suspend(self): |
| print("Sink Suspended\u001b[0K") |
| self.stream_state = self.StreamState.SUSPENDED |
| AsyncRunner.spawn(self.dispatch_to_outputs(lambda output: output.suspend())) |
| |
| def on_sink_configuration(self, config): |
| print("Sink Configuration:") |
| print('\n'.join([" " + str(capability) for capability in config])) |
| |
| def on_rtp_channel_open(self): |
| print("RTP Channel Open") |
| |
| def on_rtp_channel_close(self): |
| print("RTP Channel Closed") |
| self.stream_state = self.StreamState.IDLE |
| |
| def on_rtp_packet(self, packet): |
| self.packets_received += 1 |
| self.bytes_received += len(packet.payload) |
| print( |
| f'[{self.bytes_received} bytes in {self.packets_received} packets] {packet}', |
| end='\r', |
| ) |
| |
| for output in self.outputs: |
| output.on_rtp_packet(packet) |
| |
| async def advertise(self): |
| await self.device.set_discoverable(True) |
| await self.device.set_connectable(True) |
| |
| async def connect(self, address): |
| # Connect to the source |
| print(f'=== Connecting to {address}...') |
| connection = await self.device.connect(address, transport=BT_BR_EDR_TRANSPORT) |
| print(f'=== Connected to {connection.peer_address}') |
| |
| # Request authentication |
| print('*** Authenticating...') |
| await connection.authenticate() |
| print('*** Authenticated') |
| |
| # Enable encryption |
| print('*** Enabling encryption...') |
| await connection.encrypt() |
| print('*** Encryption on') |
| |
| protocol = await Protocol.connect(connection) |
| self.listener.set_server(connection, protocol) |
| self.on_avdtp_connection(protocol) |
| |
| async def discover_remote_endpoints(self, protocol): |
| endpoints = await protocol.discover_remote_endpoints() |
| print(f'@@@ Found {len(endpoints)} endpoints') |
| for endpoint in endpoints: |
| print('@@@', endpoint) |
| |
| async def run(self, connect_address): |
| await self.ui_server.start_http() |
| self.outputs.append( |
| WebSocketOutput( |
| self.codec, self.ui_server.send_audio, self.ui_server.send_message |
| ) |
| ) |
| |
| async with await open_transport(self.transport) as (hci_source, hci_sink): |
| # Create a device |
| device_config = DeviceConfiguration() |
| if self.device_config: |
| device_config.load_from_file(self.device_config) |
| else: |
| device_config.name = "Bumble Speaker" |
| device_config.class_of_device = 0x240414 |
| device_config.keystore = "JsonKeyStore" |
| |
| device_config.classic_enabled = True |
| device_config.le_enabled = False |
| self.device = Device.from_config_with_hci( |
| device_config, hci_source, hci_sink |
| ) |
| |
| # Setup the SDP to expose the sink service |
| self.device.sdp_service_records = self.sdp_records() |
| |
| # Don't require MITM when pairing. |
| self.device.pairing_config_factory = lambda connection: PairingConfig( |
| mitm=False |
| ) |
| |
| # Start the controller |
| await self.device.power_on() |
| |
| # Print some of the config/properties |
| print("Speaker Name:", color(device_config.name, 'yellow')) |
| print( |
| "Speaker Bluetooth Address:", |
| color( |
| self.device.public_address.to_string(with_type_qualifier=False), |
| 'yellow', |
| ), |
| ) |
| |
| # Listen for Bluetooth connections |
| self.device.on('connection', self.on_bluetooth_connection) |
| |
| # Create a listener to wait for AVDTP connections |
| self.listener = Listener(Listener.create_registrar(self.device)) |
| self.listener.on('connection', self.on_avdtp_connection) |
| |
| print(f'Speaker ready to play, codec={color(self.codec, "cyan")}') |
| |
| if connect_address: |
| # Connect to the source |
| try: |
| await self.connect(connect_address) |
| except CommandTimeoutError: |
| print(color("Connection timed out", "red")) |
| return |
| else: |
| # Start being discoverable and connectable |
| print("Waiting for connection...") |
| await self.advertise() |
| |
| await hci_source.wait_for_termination() |
| |
| for output in self.outputs: |
| await output.stop() |
| |
| |
| # ----------------------------------------------------------------------------- |
| @click.group() |
| @click.pass_context |
| def speaker_cli(ctx, device_config): |
| ctx.ensure_object(dict) |
| ctx.obj['device_config'] = device_config |
| |
| |
| @click.command() |
| @click.option( |
| '--codec', type=click.Choice(['sbc', 'aac']), default='aac', show_default=True |
| ) |
| @click.option( |
| '--discover', is_flag=True, help='Discover remote endpoints once connected' |
| ) |
| @click.option( |
| '--output', |
| multiple=True, |
| metavar='NAME', |
| help=( |
| 'Send audio to this named output ' |
| '(may be used more than once for multiple outputs)' |
| ), |
| ) |
| @click.option( |
| '--ui-port', |
| 'ui_port', |
| metavar='HTTP_PORT', |
| default=DEFAULT_UI_PORT, |
| show_default=True, |
| help='HTTP port for the UI server', |
| ) |
| @click.option( |
| '--connect', |
| 'connect_address', |
| metavar='ADDRESS_OR_NAME', |
| help='Address or name to connect to', |
| ) |
| @click.option('--device-config', metavar='FILENAME', help='Device configuration file') |
| @click.argument('transport') |
| def speaker( |
| transport, codec, connect_address, discover, output, ui_port, device_config |
| ): |
| """Run the speaker.""" |
| |
| if '@ffplay' in output: |
| # Check if ffplay is installed |
| try: |
| subprocess.run(['ffplay', '-version'], capture_output=True, check=True) |
| except FileNotFoundError: |
| print( |
| color('ffplay not installed, @ffplay output will be disabled', 'yellow') |
| ) |
| output = list(filter(lambda x: x != '@ffplay', output)) |
| |
| asyncio.run( |
| Speaker(device_config, transport, codec, discover, output, ui_port).run( |
| connect_address |
| ) |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def main(): |
| logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'WARNING').upper()) |
| speaker() |
| |
| |
| # ----------------------------------------------------------------------------- |
| if __name__ == "__main__": |
| main() # pylint: disable=no-value-for-parameter |