| # Copyright 2023 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # ----------------------------------------------------------------------------- |
| # Imports |
| # ----------------------------------------------------------------------------- |
| import asyncio |
| import struct |
| |
| import pytest |
| |
| from bumble import core |
| from bumble import device |
| from bumble import host |
| from bumble import controller |
| from bumble import link |
| from bumble import avc |
| from bumble import avrcp |
| from bumble import avctp |
| from bumble.transport import common |
| |
| |
| # ----------------------------------------------------------------------------- |
| class TwoDevices: |
| def __init__(self): |
| self.connections = [None, None] |
| |
| addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0'] |
| self.link = link.LocalLink() |
| self.controllers = [ |
| controller.Controller('C1', link=self.link, public_address=addresses[0]), |
| controller.Controller('C2', link=self.link, public_address=addresses[1]), |
| ] |
| self.devices = [ |
| device.Device( |
| address=addresses[0], |
| host=host.Host( |
| self.controllers[0], common.AsyncPipeSink(self.controllers[0]) |
| ), |
| ), |
| device.Device( |
| address=addresses[1], |
| host=host.Host( |
| self.controllers[1], common.AsyncPipeSink(self.controllers[1]) |
| ), |
| ), |
| ] |
| self.devices[0].classic_enabled = True |
| self.devices[1].classic_enabled = True |
| self.connections = [None, None] |
| self.protocols = [None, None] |
| |
| def on_connection(self, which, connection): |
| self.connections[which] = connection |
| |
| async def setup_connections(self): |
| await self.devices[0].power_on() |
| await self.devices[1].power_on() |
| |
| self.connections = await asyncio.gather( |
| self.devices[0].connect( |
| self.devices[1].public_address, core.BT_BR_EDR_TRANSPORT |
| ), |
| self.devices[1].accept(self.devices[0].public_address), |
| ) |
| |
| self.protocols = [avrcp.Protocol(), avrcp.Protocol()] |
| self.protocols[0].listen(self.devices[1]) |
| await self.protocols[1].connect(self.connections[0]) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def test_frame_parser(): |
| with pytest.raises(ValueError) as error: |
| avc.Frame.from_bytes(bytes.fromhex("11480000")) |
| |
| x = bytes.fromhex("014D0208") |
| frame = avc.Frame.from_bytes(x) |
| assert frame.subunit_type == avc.Frame.SubunitType.PANEL |
| assert frame.subunit_id == 7 |
| assert frame.opcode == 8 |
| |
| x = bytes.fromhex("014DFF0108") |
| frame = avc.Frame.from_bytes(x) |
| assert frame.subunit_type == avc.Frame.SubunitType.PANEL |
| assert frame.subunit_id == 260 |
| assert frame.opcode == 8 |
| |
| x = bytes.fromhex("0148000019581000000103") |
| |
| frame = avc.Frame.from_bytes(x) |
| |
| assert isinstance(frame, avc.CommandFrame) |
| assert frame.ctype == avc.CommandFrame.CommandType.STATUS |
| assert frame.subunit_type == avc.Frame.SubunitType.PANEL |
| assert frame.subunit_id == 0 |
| assert frame.opcode == 0 |
| |
| |
| # ----------------------------------------------------------------------------- |
| def test_vendor_dependent_command(): |
| x = bytes.fromhex("0148000019581000000103") |
| frame = avc.Frame.from_bytes(x) |
| assert isinstance(frame, avc.VendorDependentCommandFrame) |
| assert frame.company_id == 0x1958 |
| assert frame.vendor_dependent_data == bytes.fromhex("1000000103") |
| |
| frame = avc.VendorDependentCommandFrame( |
| avc.CommandFrame.CommandType.STATUS, |
| avc.Frame.SubunitType.PANEL, |
| 0, |
| 0x1958, |
| bytes.fromhex("1000000103"), |
| ) |
| assert bytes(frame) == x |
| |
| |
| # ----------------------------------------------------------------------------- |
| def test_avctp_message_assembler(): |
| received_message = [] |
| |
| def on_message(transaction_label, is_response, ipid, pid, payload): |
| received_message.append((transaction_label, is_response, ipid, pid, payload)) |
| |
| assembler = avctp.MessageAssembler(on_message) |
| |
| payload = bytes.fromhex("01") |
| assembler.on_pdu(bytes([1 << 4 | 0b00 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload) |
| assert received_message |
| assert received_message[0] == (1, False, False, 0x1122, payload) |
| |
| received_message = [] |
| payload = bytes.fromhex("010203") |
| assembler.on_pdu(bytes([1 << 4 | 0b01 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload) |
| assert len(received_message) == 0 |
| assembler.on_pdu(bytes([1 << 4 | 0b00 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload) |
| assert received_message |
| assert received_message[0] == (1, False, False, 0x1122, payload) |
| |
| received_message = [] |
| payload = bytes.fromhex("010203") |
| assembler.on_pdu( |
| bytes([1 << 4 | 0b01 << 2 | 1 << 1 | 0, 3, 0x11, 0x22]) + payload[0:1] |
| ) |
| assembler.on_pdu( |
| bytes([1 << 4 | 0b10 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload[1:2] |
| ) |
| assembler.on_pdu( |
| bytes([1 << 4 | 0b11 << 2 | 1 << 1 | 0, 0x11, 0x22]) + payload[2:3] |
| ) |
| assert received_message |
| assert received_message[0] == (1, False, False, 0x1122, payload) |
| |
| # received_message = [] |
| # parameter = bytes.fromhex("010203") |
| # assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, len(parameter)) + parameter) |
| # assert len(received_message) == 0 |
| |
| |
| # ----------------------------------------------------------------------------- |
| def test_avrcp_pdu_assembler(): |
| received_pdus = [] |
| |
| def on_pdu(pdu_id, parameter): |
| received_pdus.append((pdu_id, parameter)) |
| |
| assembler = avrcp.PduAssembler(on_pdu) |
| |
| parameter = bytes.fromhex("01") |
| assembler.on_pdu(struct.pack(">BBH", 0x10, 0b00, len(parameter)) + parameter) |
| assert received_pdus |
| assert received_pdus[0] == (0x10, parameter) |
| |
| received_pdus = [] |
| parameter = bytes.fromhex("010203") |
| assembler.on_pdu(struct.pack(">BBH", 0x10, 0b01, len(parameter)) + parameter) |
| assert len(received_pdus) == 0 |
| assembler.on_pdu(struct.pack(">BBH", 0x10, 0b00, len(parameter)) + parameter) |
| assert received_pdus |
| assert received_pdus[0] == (0x10, parameter) |
| |
| received_pdus = [] |
| parameter = bytes.fromhex("010203") |
| assembler.on_pdu(struct.pack(">BBH", 0x10, 0b01, 1) + parameter[0:1]) |
| assembler.on_pdu(struct.pack(">BBH", 0x10, 0b10, 1) + parameter[1:2]) |
| assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, 1) + parameter[2:3]) |
| assert received_pdus |
| assert received_pdus[0] == (0x10, parameter) |
| |
| received_pdus = [] |
| parameter = bytes.fromhex("010203") |
| assembler.on_pdu(struct.pack(">BBH", 0x10, 0b11, len(parameter)) + parameter) |
| assert len(received_pdus) == 0 |
| |
| |
| def test_passthrough_commands(): |
| play_pressed = avc.PassThroughCommandFrame( |
| avc.CommandFrame.CommandType.CONTROL, |
| avc.CommandFrame.SubunitType.PANEL, |
| 0, |
| avc.PassThroughCommandFrame.StateFlag.PRESSED, |
| avc.PassThroughCommandFrame.OperationId.PLAY, |
| b'', |
| ) |
| |
| play_pressed_bytes = bytes(play_pressed) |
| parsed = avc.Frame.from_bytes(play_pressed_bytes) |
| assert isinstance(parsed, avc.PassThroughCommandFrame) |
| assert parsed.operation_id == avc.PassThroughCommandFrame.OperationId.PLAY |
| assert bytes(parsed) == play_pressed_bytes |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_get_supported_events(): |
| two_devices = TwoDevices() |
| await two_devices.setup_connections() |
| |
| supported_events = await two_devices.protocols[0].get_supported_events() |
| assert supported_events == [] |
| |
| delegate1 = avrcp.Delegate([avrcp.EventId.VOLUME_CHANGED]) |
| two_devices.protocols[0].delegate = delegate1 |
| supported_events = await two_devices.protocols[1].get_supported_events() |
| assert supported_events == [avrcp.EventId.VOLUME_CHANGED] |
| |
| |
| # ----------------------------------------------------------------------------- |
| if __name__ == '__main__': |
| test_frame_parser() |
| test_vendor_dependent_command() |
| test_avctp_message_assembler() |
| test_avrcp_pdu_assembler() |
| test_passthrough_commands() |
| test_get_supported_events() |