blob: 2b8ce006208f73332498eac1ee1cbeeb1cae3ca5 [file] [log] [blame] [edit]
# Copyright 2021-2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
from __future__ import annotations
import enum
import logging
from typing import Dict, List
from bumble.core import BT_BR_EDR_TRANSPORT, CommandTimeoutError
from bumble.device import Device, DeviceConfiguration
from bumble.pairing import PairingConfig
from bumble.sdp import ServiceAttribute
from bumble.avdtp import (
AVDTP_AUDIO_MEDIA_TYPE,
Listener,
MediaCodecCapabilities,
MediaPacket,
Protocol,
)
from bumble.a2dp import (
make_audio_sink_service_sdp_records,
MPEG_2_AAC_LC_OBJECT_TYPE,
A2DP_SBC_CODEC_TYPE,
A2DP_MPEG_2_4_AAC_CODEC_TYPE,
SBC_MONO_CHANNEL_MODE,
SBC_DUAL_CHANNEL_MODE,
SBC_SNR_ALLOCATION_METHOD,
SBC_LOUDNESS_ALLOCATION_METHOD,
SBC_STEREO_CHANNEL_MODE,
SBC_JOINT_STEREO_CHANNEL_MODE,
SbcMediaCodecInformation,
AacMediaCodecInformation,
)
from bumble.utils import AsyncRunner
from bumble.codecs import AacAudioRtpPacket
from bumble.hci import HCI_Reset_Command
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class AudioExtractor:
@staticmethod
def create(codec: str):
if codec == 'aac':
return AacAudioExtractor()
if codec == 'sbc':
return SbcAudioExtractor()
def extract_audio(self, packet: MediaPacket) -> bytes:
raise NotImplementedError()
# -----------------------------------------------------------------------------
class AacAudioExtractor:
def extract_audio(self, packet: MediaPacket) -> bytes:
return AacAudioRtpPacket(packet.payload).to_adts()
# -----------------------------------------------------------------------------
class SbcAudioExtractor:
def extract_audio(self, packet: MediaPacket) -> bytes:
# header = packet.payload[0]
# fragmented = header >> 7
# start = (header >> 6) & 0x01
# last = (header >> 5) & 0x01
# number_of_frames = header & 0x0F
# TODO: support fragmented payloads
return packet.payload[1:]
# -----------------------------------------------------------------------------
class Speaker:
class StreamState(enum.Enum):
IDLE = 0
STOPPED = 1
STARTED = 2
SUSPENDED = 3
def __init__(self, hci_source, hci_sink, codec):
self.hci_source = hci_source
self.hci_sink = hci_sink
self.js_listeners = {}
self.codec = codec
self.device = None
self.connection = None
self.avdtp_listener = None
self.packets_received = 0
self.bytes_received = 0
self.stream_state = Speaker.StreamState.IDLE
self.audio_extractor = AudioExtractor.create(codec)
def sdp_records(self) -> Dict[int, List[ServiceAttribute]]:
service_record_handle = 0x00010001
return {
service_record_handle: make_audio_sink_service_sdp_records(
service_record_handle
)
}
def codec_capabilities(self) -> MediaCodecCapabilities:
if self.codec == 'aac':
return self.aac_codec_capabilities()
if self.codec == 'sbc':
return self.sbc_codec_capabilities()
raise RuntimeError('unsupported codec')
def aac_codec_capabilities(self) -> MediaCodecCapabilities:
return MediaCodecCapabilities(
media_type=AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=A2DP_MPEG_2_4_AAC_CODEC_TYPE,
media_codec_information=AacMediaCodecInformation.from_lists(
object_types=[MPEG_2_AAC_LC_OBJECT_TYPE],
sampling_frequencies=[48000, 44100],
channels=[1, 2],
vbr=1,
bitrate=256000,
),
)
def sbc_codec_capabilities(self) -> MediaCodecCapabilities:
return MediaCodecCapabilities(
media_type=AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=A2DP_SBC_CODEC_TYPE,
media_codec_information=SbcMediaCodecInformation.from_lists(
sampling_frequencies=[48000, 44100, 32000, 16000],
channel_modes=[
SBC_MONO_CHANNEL_MODE,
SBC_DUAL_CHANNEL_MODE,
SBC_STEREO_CHANNEL_MODE,
SBC_JOINT_STEREO_CHANNEL_MODE,
],
block_lengths=[4, 8, 12, 16],
subbands=[4, 8],
allocation_methods=[
SBC_LOUDNESS_ALLOCATION_METHOD,
SBC_SNR_ALLOCATION_METHOD,
],
minimum_bitpool_value=2,
maximum_bitpool_value=53,
),
)
def on_key_store_update(self):
print("Key Store updated")
self.emit('key_store_update')
def on_bluetooth_connection(self, connection):
print(f'Connection: {connection}')
self.connection = connection
connection.on('disconnection', self.on_bluetooth_disconnection)
peer_name = '' if connection.peer_name is None else connection.peer_name
peer_address = connection.peer_address.to_string(False)
self.emit('connection', {'peer_name': peer_name, 'peer_address': peer_address})
def on_bluetooth_disconnection(self, reason):
print(f'Disconnection ({reason})')
self.connection = None
self.emit('disconnection', None)
def on_avdtp_connection(self, protocol):
print('Audio Stream Open')
# Add a sink endpoint to the server
sink = protocol.add_sink(self.codec_capabilities())
sink.on('start', self.on_sink_start)
sink.on('stop', self.on_sink_stop)
sink.on('suspend', self.on_sink_suspend)
sink.on('configuration', lambda: self.on_sink_configuration(sink.configuration))
sink.on('rtp_packet', self.on_rtp_packet)
sink.on('rtp_channel_open', self.on_rtp_channel_open)
sink.on('rtp_channel_close', self.on_rtp_channel_close)
# Listen for close events
protocol.on('close', self.on_avdtp_close)
def on_avdtp_close(self):
print("Audio Stream Closed")
def on_sink_start(self):
print("Sink Started")
self.stream_state = self.StreamState.STARTED
self.emit('start', None)
def on_sink_stop(self):
print("Sink Stopped")
self.stream_state = self.StreamState.STOPPED
self.emit('stop', None)
def on_sink_suspend(self):
print("Sink Suspended")
self.stream_state = self.StreamState.SUSPENDED
self.emit('suspend', None)
def on_sink_configuration(self, config):
print("Sink Configuration:")
print('\n'.join([" " + str(capability) for capability in config]))
def on_rtp_channel_open(self):
print("RTP Channel Open")
def on_rtp_channel_close(self):
print("RTP Channel Closed")
self.stream_state = self.StreamState.IDLE
def on_rtp_packet(self, packet):
self.packets_received += 1
self.bytes_received += len(packet.payload)
self.emit("audio", self.audio_extractor.extract_audio(packet))
async def connect(self, address):
# Connect to the source
print(f'=== Connecting to {address}...')
connection = await self.device.connect(address, transport=BT_BR_EDR_TRANSPORT)
print(f'=== Connected to {connection.peer_address}')
# Request authentication
print('*** Authenticating...')
await connection.authenticate()
print('*** Authenticated')
# Enable encryption
print('*** Enabling encryption...')
await connection.encrypt()
print('*** Encryption on')
protocol = await Protocol.connect(connection)
self.avdtp_listener.set_server(connection, protocol)
self.on_avdtp_connection(protocol)
async def discover_remote_endpoints(self, protocol):
endpoints = await protocol.discover_remote_endpoints()
print(f'@@@ Found {len(endpoints)} endpoints')
for endpoint in endpoints:
print('@@@', endpoint)
def on(self, event_name, listener):
self.js_listeners[event_name] = listener
def emit(self, event_name, event=None):
if listener := self.js_listeners.get(event_name):
listener(event)
async def run(self, connect_address):
# Create a device
device_config = DeviceConfiguration()
device_config.name = "Bumble Speaker"
device_config.class_of_device = 0x240414
device_config.keystore = "JsonKeyStore:/bumble/keystore.json"
device_config.classic_enabled = True
device_config.le_enabled = False
self.device = Device.from_config_with_hci(
device_config, self.hci_source, self.hci_sink
)
# Setup the SDP to expose the sink service
self.device.sdp_service_records = self.sdp_records()
# Don't require MITM when pairing.
self.device.pairing_config_factory = lambda connection: PairingConfig(
mitm=False
)
# Start the controller
await self.device.power_on()
# Listen for Bluetooth connections
self.device.on('connection', self.on_bluetooth_connection)
# Listen for changes to the key store
self.device.on('key_store_update', self.on_key_store_update)
# Create a listener to wait for AVDTP connections
self.avdtp_listener = Listener.for_device(self.device)
self.avdtp_listener.on('connection', self.on_avdtp_connection)
print(f'Speaker ready to play, codec={self.codec}')
if connect_address:
# Connect to the source
try:
await self.connect(connect_address)
except CommandTimeoutError:
print("Connection timed out")
return
else:
# We'll wait for a connection
print("Waiting for connection...")
async def start(self):
await self.run(None)
async def stop(self):
# TODO: replace this once a proper reset is implemented in the lib.
await self.device.host.send_command(HCI_Reset_Command())
await self.device.power_off()
print('Speaker stopped')
# -----------------------------------------------------------------------------
def main(hci_source, hci_sink):
return Speaker(hci_source, hci_sink, "aac")