blob: 103f3608a9eebeed5f996aa10e8c6ab5a7111942 [file] [log] [blame]
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import asyncio
import struct
import pytest
from bumble import core
from bumble import device
from bumble import host
from bumble import controller
from bumble import link
from bumble import avc
from bumble import avrcp
from bumble import avctp
from bumble.transport import common
# -----------------------------------------------------------------------------
class TwoDevices:
def __init__(self):
self.connections = [None, None]
addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0']
self.link = link.LocalLink()
self.controllers = [
controller.Controller('C1', link=self.link, public_address=addresses[0]),
controller.Controller('C2', link=self.link, public_address=addresses[1]),
]
self.devices = [
device.Device(
address=addresses[0],
host=host.Host(
self.controllers[0], common.AsyncPipeSink(self.controllers[0])
),
),
device.Device(
address=addresses[1],
host=host.Host(
self.controllers[1], common.AsyncPipeSink(self.controllers[1])
),
),
]
self.devices[0].classic_enabled = True
self.devices[1].classic_enabled = True
self.connections = [None, None]
self.protocols = [None, None]
def on_connection(self, which, connection):
self.connections[which] = connection
async def setup_connections(self):
await self.devices[0].power_on()
await self.devices[1].power_on()
self.connections = await asyncio.gather(
self.devices[0].connect(
self.devices[1].public_address, core.BT_BR_EDR_TRANSPORT
),
self.devices[1].accept(self.devices[0].public_address),
)
self.protocols = [avrcp.Protocol(), avrcp.Protocol()]
self.protocols[0].listen(self.devices[1])
await self.protocols[1].connect(self.connections[0])
# -----------------------------------------------------------------------------
def test_frame_parser():
with pytest.raises(ValueError) as error:
avc.Frame.from_bytes(bytes.fromhex("11480000"))
x = bytes.fromhex("014D0208")
frame = avc.Frame.from_bytes(x)
assert frame.subunit_type == avc.Frame.SubunitType.PANEL
assert frame.subunit_id == 7
assert frame.opcode == 8
x = bytes.fromhex("014DFF0108")
frame = avc.Frame.from_bytes(x)
assert frame.subunit_type == avc.Frame.SubunitType.PANEL
assert frame.subunit_id == 260
assert frame.opcode == 8
x = bytes.fromhex("0148000019581000000103")
frame = avc.Frame.from_bytes(x)
assert isinstance(frame, avc.CommandFrame)
assert frame.ctype == avc.CommandFrame.CommandType.STATUS
assert frame.subunit_type == avc.Frame.SubunitType.PANEL
assert frame.subunit_id == 0
assert frame.opcode == 0
# -----------------------------------------------------------------------------
def test_vendor_dependent_command():
x = bytes.fromhex("0148000019581000000103")
frame = avc.Frame.from_bytes(x)
assert isinstance(frame, avc.VendorDependentCommandFrame)
assert frame.company_id == 0x1958
assert frame.vendor_dependent_data == bytes.fromhex("1000000103")
frame = avc.VendorDependentCommandFrame(
avc.CommandFrame.CommandType.STATUS,
avc.Frame.SubunitType.PANEL,
0,
0x1958,
bytes.fromhex("1000000103"),
)
assert bytes(frame) == x
# -----------------------------------------------------------------------------
def test_avctp_message_assembler():
received_message = []
def on_message(transaction_label, is_response, ipid, pid, payload):
received_message.append((transaction_label, is_response, ipid, pid, payload))
assembler = avctp.MessageAssembler(on_message)
payload = bytes.fromhex("01")
assembler.on_pdu(bytes([1 << 4 | 0b00 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload)
assert received_message
assert received_message[0] == (1, False, False, 0x1122, payload)
received_message = []
payload = bytes.fromhex("010203")
assembler.on_pdu(bytes([1 << 4 | 0b01 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload)
assert len(received_message) == 0
assembler.on_pdu(bytes([1 << 4 | 0b00 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload)
assert received_message
assert received_message[0] == (1, False, False, 0x1122, payload)
received_message = []
payload = bytes.fromhex("010203")
assembler.on_pdu(
bytes([1 << 4 | 0b01 << 2 | 1 << 1 | 0, 3, 0x11, 0x22]) + payload[0:1]
)
assembler.on_pdu(
bytes([1 << 4 | 0b10 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload[1:2]
)
assembler.on_pdu(
bytes([1 << 4 | 0b11 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload[2:3]
)
assert received_message
assert received_message[0] == (1, False, False, 0x1122, payload)
# received_message = []
# parameter = bytes.fromhex("010203")
# assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, len(parameter)) + parameter)
# assert len(received_message) == 0
# -----------------------------------------------------------------------------
def test_avrcp_pdu_assembler():
received_pdus = []
def on_pdu(pdu_id, parameter):
received_pdus.append((pdu_id, parameter))
assembler = avrcp.PduAssembler(on_pdu)
parameter = bytes.fromhex("01")
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b00, len(parameter)) + parameter)
assert received_pdus
assert received_pdus[0] == (0x10, parameter)
received_pdus = []
parameter = bytes.fromhex("010203")
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b01, len(parameter)) + parameter)
assert len(received_pdus) == 0
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b00, len(parameter)) + parameter)
assert received_pdus
assert received_pdus[0] == (0x10, parameter)
received_pdus = []
parameter = bytes.fromhex("010203")
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b01, 1) + parameter[0:1])
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b10, 1) + parameter[1:2])
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, 1) + parameter[2:3])
assert received_pdus
assert received_pdus[0] == (0x10, parameter)
received_pdus = []
parameter = bytes.fromhex("010203")
assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, len(parameter)) + parameter)
assert len(received_pdus) == 0
def test_passthrough_commands():
play_pressed = avc.PassThroughCommandFrame(
avc.CommandFrame.CommandType.CONTROL,
avc.CommandFrame.SubunitType.PANEL,
0,
avc.PassThroughCommandFrame.StateFlag.PRESSED,
avc.PassThroughCommandFrame.OperationId.PLAY,
b'',
)
play_pressed_bytes = bytes(play_pressed)
parsed = avc.Frame.from_bytes(play_pressed_bytes)
assert isinstance(parsed, avc.PassThroughCommandFrame)
assert parsed.operation_id == avc.PassThroughCommandFrame.OperationId.PLAY
assert bytes(parsed) == play_pressed_bytes
# -----------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_supported_events():
two_devices = TwoDevices()
await two_devices.setup_connections()
supported_events = await two_devices.protocols[0].get_supported_events()
assert supported_events == []
delegate1 = avrcp.Delegate([avrcp.EventId.VOLUME_CHANGED])
two_devices.protocols[0].delegate = delegate1
supported_events = await two_devices.protocols[1].get_supported_events()
assert supported_events == [avrcp.EventId.VOLUME_CHANGED]
# -----------------------------------------------------------------------------
if __name__ == '__main__':
test_frame_parser()
test_vendor_dependent_command()
test_avctp_message_assembler()
test_avrcp_pdu_assembler()
test_passthrough_commands()
test_get_supported_events()