blob: e49953154a866fba23ed8dc6b4b02c31b91758b9 [file] [log] [blame]
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import logging
import os
import pytest
from bumble.controller import Controller
from bumble.link import LocalLink
from bumble.device import Device
from bumble.host import Host
from bumble.transport import AsyncPipeSink
from bumble.avdtp import (
AVDTP_IDLE_STATE,
AVDTP_STREAMING_STATE,
MediaPacketPump,
Protocol,
Listener,
MediaCodecCapabilities,
MediaPacket,
AVDTP_AUDIO_MEDIA_TYPE,
AVDTP_TSEP_SNK,
A2DP_SBC_CODEC_TYPE,
)
from bumble.a2dp import (
SbcMediaCodecInformation,
SBC_MONO_CHANNEL_MODE,
SBC_DUAL_CHANNEL_MODE,
SBC_STEREO_CHANNEL_MODE,
SBC_JOINT_STEREO_CHANNEL_MODE,
SBC_LOUDNESS_ALLOCATION_METHOD,
SBC_SNR_ALLOCATION_METHOD,
)
# -----------------------------------------------------------------------------
# Logging
# -----------------------------------------------------------------------------
logger = logging.getLogger(__name__)
# -----------------------------------------------------------------------------
class TwoDevices:
def __init__(self):
self.connections = [None, None]
self.link = LocalLink()
self.controllers = [
Controller('C1', link=self.link),
Controller('C2', link=self.link),
]
self.devices = [
Device(
address='F0:F1:F2:F3:F4:F5',
host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])),
),
Device(
address='F5:F4:F3:F2:F1:F0',
host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])),
),
]
self.paired = [None, None]
def on_connection(self, which, connection):
self.connections[which] = connection
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_self_connection():
# Create two devices, each with a controller, attached to the same link
two_devices = TwoDevices()
# Attach listeners
two_devices.devices[0].on(
'connection', lambda connection: two_devices.on_connection(0, connection)
)
two_devices.devices[1].on(
'connection', lambda connection: two_devices.on_connection(1, connection)
)
# Start
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
# Connect the two devices
await two_devices.devices[0].connect(two_devices.devices[1].random_address)
# Check the post conditions
assert two_devices.connections[0] is not None
assert two_devices.connections[1] is not None
# -----------------------------------------------------------------------------
def source_codec_capabilities():
return MediaCodecCapabilities(
media_type=AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=A2DP_SBC_CODEC_TYPE,
media_codec_information=SbcMediaCodecInformation.from_discrete_values(
sampling_frequency=44100,
channel_mode=SBC_JOINT_STEREO_CHANNEL_MODE,
block_length=16,
subbands=8,
allocation_method=SBC_LOUDNESS_ALLOCATION_METHOD,
minimum_bitpool_value=2,
maximum_bitpool_value=53,
),
)
# -----------------------------------------------------------------------------
def sink_codec_capabilities():
return MediaCodecCapabilities(
media_type=AVDTP_AUDIO_MEDIA_TYPE,
media_codec_type=A2DP_SBC_CODEC_TYPE,
media_codec_information=SbcMediaCodecInformation.from_lists(
sampling_frequencies=[48000, 44100, 32000, 16000],
channel_modes=[
SBC_MONO_CHANNEL_MODE,
SBC_DUAL_CHANNEL_MODE,
SBC_STEREO_CHANNEL_MODE,
SBC_JOINT_STEREO_CHANNEL_MODE,
],
block_lengths=[4, 8, 12, 16],
subbands=[4, 8],
allocation_methods=[
SBC_LOUDNESS_ALLOCATION_METHOD,
SBC_SNR_ALLOCATION_METHOD,
],
minimum_bitpool_value=2,
maximum_bitpool_value=53,
),
)
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_source_sink_1():
two_devices = TwoDevices()
await two_devices.devices[0].power_on()
await two_devices.devices[1].power_on()
def on_rtp_packet(packet):
rtp_packets.append(packet)
if len(rtp_packets) == rtp_packets_expected:
rtp_packets_fully_received.set_result(None)
sink = None
def on_avdtp_connection(server):
nonlocal sink
sink = server.add_sink(sink_codec_capabilities())
sink.on('rtp_packet', on_rtp_packet)
# Create a listener to wait for AVDTP connections
listener = Listener(Listener.create_registrar(two_devices.devices[1]))
listener.on('connection', on_avdtp_connection)
connection = await two_devices.devices[0].connect(
two_devices.devices[1].random_address
)
client = await Protocol.connect(connection)
endpoints = await client.discover_remote_endpoints()
assert len(endpoints) == 1
remote_sink = list(endpoints)[0]
assert remote_sink.in_use == 0
assert remote_sink.media_type == AVDTP_AUDIO_MEDIA_TYPE
assert remote_sink.tsep == AVDTP_TSEP_SNK
async def generate_packets(packet_count):
sequence_number = 0
timestamp = 0
for i in range(packet_count):
payload = bytes([sequence_number % 256])
packet = MediaPacket(
2, 0, 0, 0, sequence_number, timestamp, 0, [], 96, payload
)
packet.timestamp_seconds = timestamp / 44100
timestamp += 10
sequence_number += 1
yield packet
# Send packets using a pump object
rtp_packets_fully_received = asyncio.get_running_loop().create_future()
rtp_packets_expected = 3
rtp_packets = []
pump = MediaPacketPump(generate_packets(3))
source = client.add_source(source_codec_capabilities(), pump)
stream = await client.create_stream(source, remote_sink)
await stream.start()
assert stream.state == AVDTP_STREAMING_STATE
assert stream.local_endpoint.in_use == 1
assert stream.rtp_channel is not None
assert sink.in_use == 1
assert sink.stream is not None
assert sink.stream.state == AVDTP_STREAMING_STATE
await rtp_packets_fully_received
await stream.close()
assert stream.rtp_channel is None
assert source.in_use == 0
assert source.stream.state == AVDTP_IDLE_STATE
assert sink.in_use == 0
assert sink.stream.state == AVDTP_IDLE_STATE
# Send packets manually
rtp_packets_fully_received = asyncio.get_running_loop().create_future()
rtp_packets_expected = 3
rtp_packets = []
source_packets = [
MediaPacket(2, 0, 0, 0, i, i * 10, 0, [], 96, bytes([i])) for i in range(3)
]
source = client.add_source(source_codec_capabilities(), None)
stream = await client.create_stream(source, remote_sink)
await stream.start()
assert stream.state == AVDTP_STREAMING_STATE
assert stream.local_endpoint.in_use == 1
assert stream.rtp_channel is not None
assert sink.in_use == 1
assert sink.stream is not None
assert sink.stream.state == AVDTP_STREAMING_STATE
stream.send_media_packet(source_packets[0])
stream.send_media_packet(source_packets[1])
stream.send_media_packet(source_packets[2])
await stream.close()
assert stream.rtp_channel is None
assert len(rtp_packets) == 3
assert source.in_use == 0
assert source.stream.state == AVDTP_IDLE_STATE
assert sink.in_use == 0
assert sink.stream.state == AVDTP_IDLE_STATE
# -----------------------------------------------------------------------------
async def run_test_self():
await test_self_connection()
await test_source_sink_1()
# -----------------------------------------------------------------------------
if __name__ == '__main__':
logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
asyncio.run(run_test_self())