Merge remote-tracking branch 'aosp/upstream-main' am: 5bc991d3cc am: f02f1a6083
Original change: https://android-review.googlesource.com/c/platform/external/python/bumble/+/3264414
Change-Id: I493c824014c05412e8cdfd30889f095ca62e8b41
Signed-off-by: Automerger Merge Worker <[email protected]>
diff --git a/METADATA b/METADATA
index 8554782..2f4e87b 100644
--- a/METADATA
+++ b/METADATA
@@ -11,7 +11,7 @@
type: GIT
value: "https://github.com/google/bumble"
}
- version: "7b34bb405073f4f4b9af5937149f2ad677ed6232"
- last_upgrade_date { year: 2024 month: 9 day: 05 }
+ version: "cd9feeb4551f478691e33450219adf6d58c4871f"
+ last_upgrade_date { year: 2024 month: 9 day: 12 }
license_type: NOTICE
}
diff --git a/bumble/drivers/rtk.py b/bumble/drivers/rtk.py
index 1336d2c..c332bf0 100644
--- a/bumble/drivers/rtk.py
+++ b/bumble/drivers/rtk.py
@@ -301,6 +301,8 @@
fw_name: str = ""
config_name: str = ""
+ POST_RESET_DELAY: float = 0.2
+
DRIVER_INFOS = [
# 8723A
DriverInfo(
@@ -495,12 +497,24 @@
@classmethod
async def driver_info_for_host(cls, host):
- await host.send_command(HCI_Reset_Command(), check_result=True)
- host.ready = True # Needed to let the host know the controller is ready.
+ try:
+ await host.send_command(
+ HCI_Reset_Command(),
+ check_result=True,
+ response_timeout=cls.POST_RESET_DELAY,
+ )
+ host.ready = True # Needed to let the host know the controller is ready.
+ except asyncio.exceptions.TimeoutError:
+ logger.warning("timeout waiting for hci reset, retrying")
+ await host.send_command(HCI_Reset_Command(), check_result=True)
+ host.ready = True
- response = await host.send_command(
- HCI_Read_Local_Version_Information_Command(), check_result=True
- )
+ command = HCI_Read_Local_Version_Information_Command()
+ response = await host.send_command(command, check_result=True)
+ if response.command_opcode != command.op_code:
+ logger.error("failed to probe local version information")
+ return None
+
local_version = response.return_parameters
logger.debug(
diff --git a/bumble/host.py b/bumble/host.py
index 8085d5c..a3d3dad 100644
--- a/bumble/host.py
+++ b/bumble/host.py
@@ -171,7 +171,7 @@
self.cis_links = {} # CIS links, by connection handle
self.sco_links = {} # SCO links, by connection handle
self.pending_command = None
- self.pending_response = None
+ self.pending_response: Optional[asyncio.Future[Any]] = None
self.number_of_supported_advertising_sets = 0
self.maximum_advertising_data_length = 31
self.local_version = None
@@ -514,7 +514,9 @@
if self.hci_sink:
self.hci_sink.on_packet(bytes(packet))
- async def send_command(self, command, check_result=False):
+ async def send_command(
+ self, command, check_result=False, response_timeout: Optional[int] = None
+ ):
# Wait until we can send (only one pending command at a time)
async with self.command_semaphore:
assert self.pending_command is None
@@ -526,12 +528,13 @@
try:
self.send_hci_packet(command)
- response = await self.pending_response
+ await asyncio.wait_for(self.pending_response, timeout=response_timeout)
+ response = self.pending_response.result()
# Check the return parameters if required
if check_result:
if isinstance(response, hci.HCI_Command_Status_Event):
- status = response.status
+ status = response.status # type: ignore[attr-defined]
elif isinstance(response.return_parameters, int):
status = response.return_parameters
elif isinstance(response.return_parameters, bytes):
@@ -625,14 +628,21 @@
# Packet Sink protocol (packets coming from the controller via HCI)
def on_packet(self, packet: bytes) -> None:
- hci_packet = hci.HCI_Packet.from_bytes(packet)
+ try:
+ hci_packet = hci.HCI_Packet.from_bytes(packet)
+ except Exception as error:
+ logger.warning(f'!!! error parsing packet from bytes: {error}')
+ return
+
if self.ready or (
isinstance(hci_packet, hci.HCI_Command_Complete_Event)
and hci_packet.command_opcode == hci.HCI_RESET_COMMAND
):
self.on_hci_packet(hci_packet)
else:
- logger.debug('reset not done, ignoring packet from controller')
+ logger.debug(
+ f'reset not done, ignoring packet from controller: {hci_packet}'
+ )
def on_transport_lost(self):
# Called by the source when the transport has been lost.
diff --git a/bumble/profiles/aics.py b/bumble/profiles/aics.py
new file mode 100644
index 0000000..8b7468f
--- /dev/null
+++ b/bumble/profiles/aics.py
@@ -0,0 +1,519 @@
+# Copyright 2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""LE Audio - Audio Input Control Service"""
+
+# -----------------------------------------------------------------------------
+# Imports
+# -----------------------------------------------------------------------------
+import logging
+import struct
+
+from dataclasses import dataclass
+from typing import Optional
+
+from bumble import gatt
+from bumble.device import Connection
+from bumble.att import ATT_Error
+from bumble.gatt import (
+ Characteristic,
+ DelegatedCharacteristicAdapter,
+ TemplateService,
+ CharacteristicValue,
+ PackedCharacteristicAdapter,
+ GATT_AUDIO_INPUT_CONTROL_SERVICE,
+ GATT_AUDIO_INPUT_STATE_CHARACTERISTIC,
+ GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC,
+ GATT_AUDIO_INPUT_TYPE_CHARACTERISTIC,
+ GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC,
+ GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC,
+ GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC,
+)
+from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
+from bumble.utils import OpenIntEnum
+
+# -----------------------------------------------------------------------------
+# Logging
+# -----------------------------------------------------------------------------
+logger = logging.getLogger(__name__)
+
+
+# -----------------------------------------------------------------------------
+# Constants
+# -----------------------------------------------------------------------------
+CHANGE_COUNTER_MAX_VALUE = 0xFF
+GAIN_SETTINGS_MIN_VALUE = 0
+GAIN_SETTINGS_MAX_VALUE = 255
+
+
+class ErrorCode(OpenIntEnum):
+ '''
+ Cf. 1.6 Application error codes
+ '''
+
+ INVALID_CHANGE_COUNTER = 0x80
+ OPCODE_NOT_SUPPORTED = 0x81
+ MUTE_DISABLED = 0x82
+ VALUE_OUT_OF_RANGE = 0x83
+ GAIN_MODE_CHANGE_NOT_ALLOWED = 0x84
+
+
+class Mute(OpenIntEnum):
+ '''
+ Cf. 2.2.1.2 Mute Field
+ '''
+
+ NOT_MUTED = 0x00
+ MUTED = 0x01
+ DISABLED = 0x02
+
+
+class GainMode(OpenIntEnum):
+ '''
+ Cf. 2.2.1.3 Gain Mode
+ '''
+
+ MANUAL_ONLY = 0x00
+ AUTOMATIC_ONLY = 0x01
+ MANUAL = 0x02
+ AUTOMATIC = 0x03
+
+
+class AudioInputStatus(OpenIntEnum):
+ '''
+ Cf. 3.4 Audio Input Status
+ '''
+
+ INATIVE = 0x00
+ ACTIVE = 0x01
+
+
+class AudioInputControlPointOpCode(OpenIntEnum):
+ '''
+ Cf. 3.5.1 Audio Input Control Point procedure requirements
+ '''
+
+ SET_GAIN_SETTING = 0x00
+ UNMUTE = 0x02
+ MUTE = 0x03
+ SET_MANUAL_GAIN_MODE = 0x04
+ SET_AUTOMATIC_GAIN_MODE = 0x05
+
+
+# -----------------------------------------------------------------------------
+@dataclass
+class AudioInputState:
+ '''
+ Cf. 2.2.1 Audio Input State
+ '''
+
+ gain_settings: int = 0
+ mute: Mute = Mute.NOT_MUTED
+ gain_mode: GainMode = GainMode.MANUAL
+ change_counter: int = 0
+ attribute_value: Optional[CharacteristicValue] = None
+
+ def __bytes__(self) -> bytes:
+ return bytes(
+ [self.gain_settings, self.mute, self.gain_mode, self.change_counter]
+ )
+
+ @classmethod
+ def from_bytes(cls, data: bytes):
+ gain_settings, mute, gain_mode, change_counter = struct.unpack("BBBB", data)
+ return cls(gain_settings, mute, gain_mode, change_counter)
+
+ def update_gain_settings_unit(self, gain_settings_unit: int) -> None:
+ self.gain_settings_unit = gain_settings_unit
+
+ def increment_gain_settings(self, gain_settings_unit: int) -> None:
+ self.gain_settings += gain_settings_unit
+ self.increment_change_counter()
+
+ def decrement_gain_settings(self) -> None:
+ self.gain_settings -= self.gain_settings_unit
+ self.increment_change_counter()
+
+ def increment_change_counter(self):
+ self.change_counter = (self.change_counter + 1) % (CHANGE_COUNTER_MAX_VALUE + 1)
+
+ async def notify_subscribers_via_connection(self, connection: Connection) -> None:
+ assert self.attribute_value is not None
+ await connection.device.notify_subscribers(
+ attribute=self.attribute_value, value=bytes(self)
+ )
+
+ def on_read(self, _connection: Optional[Connection]) -> bytes:
+ return bytes(self)
+
+
+@dataclass
+class GainSettingsProperties:
+ '''
+ Cf. 3.2 Gain Settings Properties
+ '''
+
+ gain_settings_unit: int = 1
+ gain_settings_minimum: int = GAIN_SETTINGS_MIN_VALUE
+ gain_settings_maximum: int = GAIN_SETTINGS_MAX_VALUE
+
+ @classmethod
+ def from_bytes(cls, data: bytes):
+ (gain_settings_unit, gain_settings_minimum, gain_settings_maximum) = (
+ struct.unpack('BBB', data)
+ )
+ GainSettingsProperties(
+ gain_settings_unit, gain_settings_minimum, gain_settings_maximum
+ )
+
+ def __bytes__(self) -> bytes:
+ return bytes(
+ [
+ self.gain_settings_unit,
+ self.gain_settings_minimum,
+ self.gain_settings_maximum,
+ ]
+ )
+
+ def on_read(self, _connection: Optional[Connection]) -> bytes:
+ return bytes(self)
+
+
+@dataclass
+class AudioInputControlPoint:
+ '''
+ Cf. 3.5.2 Audio Input Control Point
+ '''
+
+ audio_input_state: AudioInputState
+ gain_settings_properties: GainSettingsProperties
+
+ async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
+ assert connection
+
+ opcode = AudioInputControlPointOpCode(value[0])
+
+ if opcode == AudioInputControlPointOpCode.SET_GAIN_SETTING:
+ gain_settings_operand = value[2]
+ await self._set_gain_settings(connection, gain_settings_operand)
+ elif opcode == AudioInputControlPointOpCode.UNMUTE:
+ await self._unmute(connection)
+ elif opcode == AudioInputControlPointOpCode.MUTE:
+ change_counter_operand = value[1]
+ await self._mute(connection, change_counter_operand)
+ elif opcode == AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE:
+ await self._set_manual_gain_mode(connection)
+ elif opcode == AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE:
+ await self._set_automatic_gain_mode(connection)
+ else:
+ logger.error(f"OpCode value is incorrect: {opcode}")
+ raise ATT_Error(ErrorCode.OPCODE_NOT_SUPPORTED)
+
+ async def _set_gain_settings(
+ self, connection: Connection, gain_settings_operand: int
+ ) -> None:
+ '''Cf. 3.5.2.1 Set Gain Settings Procedure'''
+
+ gain_mode = self.audio_input_state.gain_mode
+
+ logger.error(f"set_gain_setting: gain_mode: {gain_mode}")
+ if not (gain_mode == GainMode.MANUAL or gain_mode == GainMode.MANUAL_ONLY):
+ logger.warning(
+ "GainMode should be either MANUAL or MANUAL_ONLY Cf Spec Audio Input Control Service 3.5.2.1"
+ )
+ return
+
+ if (
+ gain_settings_operand < self.gain_settings_properties.gain_settings_minimum
+ or gain_settings_operand
+ > self.gain_settings_properties.gain_settings_maximum
+ ):
+ logger.error("gain_seetings value out of range")
+ raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE)
+
+ if self.audio_input_state.gain_settings != gain_settings_operand:
+ self.audio_input_state.gain_settings = gain_settings_operand
+ await self.audio_input_state.notify_subscribers_via_connection(connection)
+
+ async def _unmute(self, connection: Connection):
+ '''Cf. 3.5.2.2 Unmute procedure'''
+
+ logger.error(f'unmute: {self.audio_input_state.mute}')
+ mute = self.audio_input_state.mute
+ if mute == Mute.DISABLED:
+ logger.error("unmute: Cannot change Mute value, Mute state is DISABLED")
+ raise ATT_Error(ErrorCode.MUTE_DISABLED)
+
+ if mute == Mute.NOT_MUTED:
+ return
+
+ self.audio_input_state.mute = Mute.NOT_MUTED
+ self.audio_input_state.increment_change_counter()
+ await self.audio_input_state.notify_subscribers_via_connection(connection)
+
+ async def _mute(self, connection: Connection, change_counter_operand: int) -> None:
+ '''Cf. 3.5.5.2 Mute procedure'''
+
+ change_counter = self.audio_input_state.change_counter
+ mute = self.audio_input_state.mute
+ if mute == Mute.DISABLED:
+ logger.error("mute: Cannot change Mute value, Mute state is DISABLED")
+ raise ATT_Error(ErrorCode.MUTE_DISABLED)
+
+ if change_counter != change_counter_operand:
+ raise ATT_Error(ErrorCode.INVALID_CHANGE_COUNTER)
+
+ if mute == Mute.MUTED:
+ return
+
+ self.audio_input_state.mute = Mute.MUTED
+ self.audio_input_state.increment_change_counter()
+ await self.audio_input_state.notify_subscribers_via_connection(connection)
+
+ async def _set_manual_gain_mode(self, connection: Connection) -> None:
+ '''Cf. 3.5.2.4 Set Manual Gain Mode procedure'''
+
+ gain_mode = self.audio_input_state.gain_mode
+ if gain_mode in (GainMode.AUTOMATIC_ONLY, GainMode.MANUAL_ONLY):
+ logger.error(f"Cannot change gain_mode, bad state: {gain_mode}")
+ raise ATT_Error(ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED)
+
+ if gain_mode == GainMode.MANUAL:
+ return
+
+ self.audio_input_state.gain_mode = GainMode.MANUAL
+ self.audio_input_state.increment_change_counter()
+ await self.audio_input_state.notify_subscribers_via_connection(connection)
+
+ async def _set_automatic_gain_mode(self, connection: Connection) -> None:
+ '''Cf. 3.5.2.5 Set Automatic Gain Mode'''
+
+ gain_mode = self.audio_input_state.gain_mode
+ if gain_mode in (GainMode.AUTOMATIC_ONLY, GainMode.MANUAL_ONLY):
+ logger.error(f"Cannot change gain_mode, bad state: {gain_mode}")
+ raise ATT_Error(ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED)
+
+ if gain_mode == GainMode.AUTOMATIC:
+ return
+
+ self.audio_input_state.gain_mode = GainMode.AUTOMATIC
+ self.audio_input_state.increment_change_counter()
+ await self.audio_input_state.notify_subscribers_via_connection(connection)
+
+
+@dataclass
+class AudioInputDescription:
+ '''
+ Cf. 3.6 Audio Input Description
+ '''
+
+ audio_input_description: str = "Bluetooth"
+ attribute_value: Optional[CharacteristicValue] = None
+
+ @classmethod
+ def from_bytes(cls, data: bytes):
+ return cls(audio_input_description=data.decode('utf-8'))
+
+ def __bytes__(self) -> bytes:
+ return self.audio_input_description.encode('utf-8')
+
+ def on_read(self, _connection: Optional[Connection]) -> bytes:
+ return self.audio_input_description.encode('utf-8')
+
+ async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
+ assert connection
+ assert self.attribute_value
+
+ self.audio_input_description = value.decode('utf-8')
+ await connection.device.notify_subscribers(
+ attribute=self.attribute_value, value=value
+ )
+
+
+class AICSService(TemplateService):
+ UUID = GATT_AUDIO_INPUT_CONTROL_SERVICE
+
+ def __init__(
+ self,
+ audio_input_state: Optional[AudioInputState] = None,
+ gain_settings_properties: Optional[GainSettingsProperties] = None,
+ audio_input_type: str = "local",
+ audio_input_status: Optional[AudioInputStatus] = None,
+ audio_input_description: Optional[AudioInputDescription] = None,
+ ):
+ self.audio_input_state = (
+ AudioInputState() if audio_input_state is None else audio_input_state
+ )
+ self.gain_settings_properties = (
+ GainSettingsProperties()
+ if gain_settings_properties is None
+ else gain_settings_properties
+ )
+ self.audio_input_status = (
+ AudioInputStatus.ACTIVE
+ if audio_input_status is None
+ else audio_input_status
+ )
+ self.audio_input_description = (
+ AudioInputDescription()
+ if audio_input_description is None
+ else audio_input_description
+ )
+
+ self.audio_input_control_point: AudioInputControlPoint = AudioInputControlPoint(
+ self.audio_input_state, self.gain_settings_properties
+ )
+
+ self.audio_input_state_characteristic = DelegatedCharacteristicAdapter(
+ Characteristic(
+ uuid=GATT_AUDIO_INPUT_STATE_CHARACTERISTIC,
+ properties=Characteristic.Properties.READ
+ | Characteristic.Properties.NOTIFY,
+ permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
+ value=CharacteristicValue(read=self.audio_input_state.on_read),
+ ),
+ encode=lambda value: bytes(value),
+ )
+ self.audio_input_state.attribute_value = (
+ self.audio_input_state_characteristic.value
+ )
+
+ self.gain_settings_properties_characteristic = DelegatedCharacteristicAdapter(
+ Characteristic(
+ uuid=GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC,
+ properties=Characteristic.Properties.READ,
+ permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
+ value=CharacteristicValue(read=self.gain_settings_properties.on_read),
+ )
+ )
+
+ self.audio_input_type_characteristic = Characteristic(
+ uuid=GATT_AUDIO_INPUT_TYPE_CHARACTERISTIC,
+ properties=Characteristic.Properties.READ,
+ permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
+ value=audio_input_type,
+ )
+
+ self.audio_input_status_characteristic = Characteristic(
+ uuid=GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC,
+ properties=Characteristic.Properties.READ,
+ permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
+ value=bytes([self.audio_input_status]),
+ )
+
+ self.audio_input_control_point_characteristic = DelegatedCharacteristicAdapter(
+ Characteristic(
+ uuid=GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC,
+ properties=Characteristic.Properties.WRITE,
+ permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
+ value=CharacteristicValue(
+ write=self.audio_input_control_point.on_write
+ ),
+ )
+ )
+
+ self.audio_input_description_characteristic = DelegatedCharacteristicAdapter(
+ Characteristic(
+ uuid=GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC,
+ properties=Characteristic.Properties.READ
+ | Characteristic.Properties.NOTIFY
+ | Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
+ permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
+ | Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
+ value=CharacteristicValue(
+ write=self.audio_input_description.on_write,
+ read=self.audio_input_description.on_read,
+ ),
+ )
+ )
+ self.audio_input_description.attribute_value = (
+ self.audio_input_control_point_characteristic.value
+ )
+
+ super().__init__(
+ [
+ self.audio_input_state_characteristic, # type: ignore
+ self.gain_settings_properties_characteristic, # type: ignore
+ self.audio_input_type_characteristic, # type: ignore
+ self.audio_input_status_characteristic, # type: ignore
+ self.audio_input_control_point_characteristic, # type: ignore
+ self.audio_input_description_characteristic, # type: ignore
+ ]
+ )
+
+
+# -----------------------------------------------------------------------------
+# Client
+# -----------------------------------------------------------------------------
+class AICSServiceProxy(ProfileServiceProxy):
+ SERVICE_CLASS = AICSService
+
+ def __init__(self, service_proxy: ServiceProxy) -> None:
+ self.service_proxy = service_proxy
+
+ if not (
+ characteristics := service_proxy.get_characteristics_by_uuid(
+ GATT_AUDIO_INPUT_STATE_CHARACTERISTIC
+ )
+ ):
+ raise gatt.InvalidServiceError("Audio Input State Characteristic not found")
+ self.audio_input_state = DelegatedCharacteristicAdapter(
+ characteristic=characteristics[0], decode=AudioInputState.from_bytes
+ )
+
+ if not (
+ characteristics := service_proxy.get_characteristics_by_uuid(
+ GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC
+ )
+ ):
+ raise gatt.InvalidServiceError(
+ "Gain Settings Attribute Characteristic not found"
+ )
+ self.gain_settings_properties = PackedCharacteristicAdapter(
+ characteristics[0],
+ 'BBB',
+ )
+
+ if not (
+ characteristics := service_proxy.get_characteristics_by_uuid(
+ GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC
+ )
+ ):
+ raise gatt.InvalidServiceError(
+ "Audio Input Status Characteristic not found"
+ )
+ self.audio_input_status = PackedCharacteristicAdapter(
+ characteristics[0],
+ 'B',
+ )
+
+ if not (
+ characteristics := service_proxy.get_characteristics_by_uuid(
+ GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC
+ )
+ ):
+ raise gatt.InvalidServiceError(
+ "Audio Input Control Point Characteristic not found"
+ )
+ self.audio_input_control_point = characteristics[0]
+
+ if not (
+ characteristics := service_proxy.get_characteristics_by_uuid(
+ GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC
+ )
+ ):
+ raise gatt.InvalidServiceError(
+ "Audio Input Description Characteristic not found"
+ )
+ self.audio_input_description = characteristics[0]
diff --git a/tests/aics_test.py b/tests/aics_test.py
new file mode 100644
index 0000000..8b47298
--- /dev/null
+++ b/tests/aics_test.py
@@ -0,0 +1,484 @@
+# Copyright 2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -----------------------------------------------------------------------------
+# Imports
+# -----------------------------------------------------------------------------
+import pytest
+import pytest_asyncio
+
+from bumble import device
+
+from bumble.att import ATT_Error
+
+from bumble.profiles.aics import (
+ Mute,
+ AICSService,
+ AudioInputState,
+ AICSServiceProxy,
+ GainMode,
+ AudioInputStatus,
+ AudioInputControlPointOpCode,
+ GAIN_SETTINGS_MAX_VALUE,
+ GAIN_SETTINGS_MIN_VALUE,
+ ErrorCode,
+)
+
+from .test_utils import TwoDevices
+
+
+# -----------------------------------------------------------------------------
+# Tests
+# -----------------------------------------------------------------------------
+aics_service = AICSService()
+
+
+@pytest_asyncio.fixture
+async def aics_client():
+ devices = TwoDevices()
+ devices[0].add_service(aics_service)
+
+ await devices.setup_connection()
+
+ assert devices.connections[0] is not None
+ assert devices.connections[1] is not None
+
+ devices.connections[0].encryption = 1
+ devices.connections[1].encryption = 1
+
+ peer = device.Peer(devices.connections[1])
+ aics_client = await peer.discover_service_and_create_proxy(AICSServiceProxy)
+
+ yield aics_client
+
+
+# -----------------------------------------------------------------------------
[email protected]
+async def test_init_service(aics_client: AICSServiceProxy):
+ assert await aics_client.audio_input_state.read_value() == AudioInputState(
+ gain_settings=0,
+ mute=Mute.NOT_MUTED,
+ gain_mode=GainMode.MANUAL,
+ change_counter=0,
+ )
+ assert await aics_client.gain_settings_properties.read_value() == (1, 0, 255)
+ assert await aics_client.audio_input_status.read_value() == (
+ AudioInputStatus.ACTIVE
+ )
+
+
[email protected]
+async def test_wrong_opcode_raise_error(aics_client: AICSServiceProxy):
+ with pytest.raises(ATT_Error) as e:
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ 0xFF,
+ ]
+ ),
+ with_response=True,
+ )
+
+ assert e.value.error_code == ErrorCode.OPCODE_NOT_SUPPORTED
+
+
[email protected]
+async def test_set_gain_setting_when_gain_mode_automatic_only(
+ aics_client: AICSServiceProxy,
+):
+ aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC_ONLY
+
+ change_counter = 0
+ gain_settings = 120
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.SET_GAIN_SETTING,
+ change_counter,
+ gain_settings,
+ ]
+ )
+ )
+
+ # Unchanged
+ assert await aics_client.audio_input_state.read_value() == AudioInputState(
+ gain_settings=0,
+ mute=Mute.NOT_MUTED,
+ gain_mode=GainMode.AUTOMATIC_ONLY,
+ change_counter=0,
+ )
+
+
[email protected]
+async def test_set_gain_setting_when_gain_mode_automatic(aics_client: AICSServiceProxy):
+ aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC
+ change_counter = 0
+ gain_settings = 120
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.SET_GAIN_SETTING,
+ change_counter,
+ gain_settings,
+ ]
+ )
+ )
+
+ # Unchanged
+ assert await aics_client.audio_input_state.read_value() == AudioInputState(
+ gain_settings=0,
+ mute=Mute.NOT_MUTED,
+ gain_mode=GainMode.AUTOMATIC,
+ change_counter=0,
+ )
+
+
[email protected]
+async def test_set_gain_setting_when_gain_mode_MANUAL(aics_client: AICSServiceProxy):
+ aics_service.audio_input_state.gain_mode = GainMode.MANUAL
+ change_counter = 0
+ gain_settings = 120
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.SET_GAIN_SETTING,
+ change_counter,
+ gain_settings,
+ ]
+ )
+ )
+
+ assert await aics_client.audio_input_state.read_value() == AudioInputState(
+ gain_settings=gain_settings,
+ mute=Mute.NOT_MUTED,
+ gain_mode=GainMode.MANUAL,
+ change_counter=change_counter,
+ )
+
+
[email protected]
+async def test_set_gain_setting_when_gain_mode_MANUAL_ONLY(
+ aics_client: AICSServiceProxy,
+):
+ aics_service.audio_input_state.gain_mode = GainMode.MANUAL_ONLY
+ change_counter = 0
+ gain_settings = 120
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.SET_GAIN_SETTING,
+ change_counter,
+ gain_settings,
+ ]
+ )
+ )
+
+ assert await aics_client.audio_input_state.read_value() == AudioInputState(
+ gain_settings=gain_settings,
+ mute=Mute.NOT_MUTED,
+ gain_mode=GainMode.MANUAL_ONLY,
+ change_counter=change_counter,
+ )
+
+
[email protected]
+async def test_unmute_when_muted(aics_client: AICSServiceProxy):
+ aics_service.audio_input_state.mute = Mute.MUTED
+ change_counter = 0
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.UNMUTE,
+ change_counter,
+ ]
+ )
+ )
+
+ change_counter += 1
+
+ state: AudioInputState = await aics_client.audio_input_state.read_value()
+ assert state.mute == Mute.NOT_MUTED
+ assert state.change_counter == change_counter
+
+
[email protected]
+async def test_unmute_when_mute_disabled(aics_client: AICSServiceProxy):
+ aics_service.audio_input_state.mute = Mute.DISABLED
+ aics_service.audio_input_state.change_counter = 0
+ change_counter = 0
+
+ with pytest.raises(ATT_Error) as e:
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.UNMUTE,
+ change_counter,
+ ]
+ ),
+ with_response=True,
+ )
+
+ assert e.value.error_code == ErrorCode.MUTE_DISABLED
+
+ state: AudioInputState = await aics_client.audio_input_state.read_value()
+ assert state.mute == Mute.DISABLED
+ assert state.change_counter == change_counter
+
+
[email protected]
+async def test_mute_when_not_muted(aics_client: AICSServiceProxy):
+ aics_service.audio_input_state.mute = Mute.NOT_MUTED
+ aics_service.audio_input_state.change_counter = 0
+ change_counter = 0
+
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.MUTE,
+ change_counter,
+ ]
+ )
+ )
+
+ change_counter += 1
+ state: AudioInputState = await aics_client.audio_input_state.read_value()
+ assert state.mute == Mute.MUTED
+ assert state.change_counter == change_counter
+
+
[email protected]
+async def test_mute_when_mute_disabled(aics_client: AICSServiceProxy):
+ aics_service.audio_input_state.mute = Mute.DISABLED
+ aics_service.audio_input_state.change_counter = 0
+ change_counter = 0
+
+ with pytest.raises(ATT_Error) as e:
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.MUTE,
+ change_counter,
+ ]
+ ),
+ with_response=True,
+ )
+
+ assert e.value.error_code == ErrorCode.MUTE_DISABLED
+
+ state: AudioInputState = await aics_client.audio_input_state.read_value()
+ assert state.mute == Mute.DISABLED
+ assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_manual_gain_mode_when_automatic(aics_client: AICSServiceProxy):
+ aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC
+ aics_service.audio_input_state.change_counter = 0
+ change_counter = 0
+
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE,
+ change_counter,
+ ]
+ )
+ )
+
+ change_counter += 1
+ state: AudioInputState = await aics_client.audio_input_state.read_value()
+ assert state.gain_mode == GainMode.MANUAL
+ assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_manual_gain_mode_when_already_manual(aics_client: AICSServiceProxy):
+ aics_service.audio_input_state.gain_mode = GainMode.MANUAL
+ aics_service.audio_input_state.change_counter = 0
+ change_counter = 0
+
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE,
+ change_counter,
+ ]
+ )
+ )
+
+ # No change expected
+ state: AudioInputState = await aics_client.audio_input_state.read_value()
+ assert state.gain_mode == GainMode.MANUAL
+ assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_manual_gain_mode_when_manual_only(aics_client: AICSServiceProxy):
+ aics_service.audio_input_state.gain_mode = GainMode.MANUAL_ONLY
+ aics_service.audio_input_state.change_counter = 0
+ change_counter = 0
+
+ with pytest.raises(ATT_Error) as e:
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE,
+ change_counter,
+ ]
+ ),
+ with_response=True,
+ )
+
+ assert e.value.error_code == ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED
+
+ state: AudioInputState = await aics_client.audio_input_state.read_value()
+ assert state.gain_mode == GainMode.MANUAL_ONLY
+ assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_manual_gain_mode_when_automatic_only(aics_client: AICSServiceProxy):
+ aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC_ONLY
+ aics_service.audio_input_state.change_counter = 0
+ change_counter = 0
+
+ with pytest.raises(ATT_Error) as e:
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE,
+ change_counter,
+ ]
+ ),
+ with_response=True,
+ )
+
+ assert e.value.error_code == ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED
+
+ # No change expected
+ state: AudioInputState = await aics_client.audio_input_state.read_value()
+ assert state.gain_mode == GainMode.AUTOMATIC_ONLY
+ assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_automatic_gain_mode_when_manual(aics_client: AICSServiceProxy):
+ aics_service.audio_input_state.gain_mode = GainMode.MANUAL
+ aics_service.audio_input_state.change_counter = 0
+ change_counter = 0
+
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE,
+ change_counter,
+ ]
+ )
+ )
+
+ change_counter += 1
+ state: AudioInputState = await aics_client.audio_input_state.read_value()
+ assert state.gain_mode == GainMode.AUTOMATIC
+ assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_automatic_gain_mode_when_already_automatic(
+ aics_client: AICSServiceProxy,
+):
+ aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC
+ aics_service.audio_input_state.change_counter = 0
+ change_counter = 0
+
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE,
+ change_counter,
+ ]
+ )
+ )
+
+ # No change expected
+ state: AudioInputState = await aics_client.audio_input_state.read_value()
+ assert state.gain_mode == GainMode.AUTOMATIC
+ assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_automatic_gain_mode_when_manual_only(aics_client: AICSServiceProxy):
+ aics_service.audio_input_state.gain_mode = GainMode.MANUAL_ONLY
+ aics_service.audio_input_state.change_counter = 0
+ change_counter = 0
+
+ with pytest.raises(ATT_Error) as e:
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE,
+ change_counter,
+ ]
+ ),
+ with_response=True,
+ )
+
+ assert e.value.error_code == ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED
+
+ # No change expected
+ state: AudioInputState = await aics_client.audio_input_state.read_value()
+ assert state.gain_mode == GainMode.MANUAL_ONLY
+ assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_automatic_gain_mode_when_automatic_only(
+ aics_client: AICSServiceProxy,
+):
+ aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC_ONLY
+ aics_service.audio_input_state.change_counter = 0
+ change_counter = 0
+
+ with pytest.raises(ATT_Error) as e:
+ await aics_client.audio_input_control_point.write_value(
+ bytes(
+ [
+ AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE,
+ change_counter,
+ ]
+ ),
+ with_response=True,
+ )
+
+ assert e.value.error_code == ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED
+
+ # No change expected
+ state: AudioInputState = await aics_client.audio_input_state.read_value()
+ assert state.gain_mode == GainMode.AUTOMATIC_ONLY
+ assert state.change_counter == change_counter
+
+
[email protected]
+async def test_audio_input_description_initial_value(aics_client: AICSServiceProxy):
+ description = await aics_client.audio_input_description.read_value()
+ assert description.decode('utf-8') == "Bluetooth"
+
+
[email protected]
+async def test_audio_input_description_write_and_read(aics_client: AICSServiceProxy):
+ new_description = "Line Input".encode('utf-8')
+
+ await aics_client.audio_input_description.write_value(new_description)
+
+ description = await aics_client.audio_input_description.read_value()
+ assert description == new_description