Merge remote-tracking branch 'aosp/upstream-main' am: 5bc991d3cc

Original change: https://android-review.googlesource.com/c/platform/external/python/bumble/+/3264414

Change-Id: I49e6857b366ba21e7abf98b314920ac0ea2ff098
Signed-off-by: Automerger Merge Worker <[email protected]>
diff --git a/METADATA b/METADATA
index 8554782..2f4e87b 100644
--- a/METADATA
+++ b/METADATA
@@ -11,7 +11,7 @@
     type: GIT
     value: "https://github.com/google/bumble"
   }
-  version: "7b34bb405073f4f4b9af5937149f2ad677ed6232"
-  last_upgrade_date { year: 2024 month: 9 day: 05 }
+  version: "cd9feeb4551f478691e33450219adf6d58c4871f"
+  last_upgrade_date { year: 2024 month: 9 day: 12 }
   license_type: NOTICE
 }
diff --git a/bumble/drivers/rtk.py b/bumble/drivers/rtk.py
index 1336d2c..c332bf0 100644
--- a/bumble/drivers/rtk.py
+++ b/bumble/drivers/rtk.py
@@ -301,6 +301,8 @@
         fw_name: str = ""
         config_name: str = ""
 
+    POST_RESET_DELAY: float = 0.2
+
     DRIVER_INFOS = [
         # 8723A
         DriverInfo(
@@ -495,12 +497,24 @@
 
     @classmethod
     async def driver_info_for_host(cls, host):
-        await host.send_command(HCI_Reset_Command(), check_result=True)
-        host.ready = True  # Needed to let the host know the controller is ready.
+        try:
+            await host.send_command(
+                HCI_Reset_Command(),
+                check_result=True,
+                response_timeout=cls.POST_RESET_DELAY,
+            )
+            host.ready = True  # Needed to let the host know the controller is ready.
+        except asyncio.exceptions.TimeoutError:
+            logger.warning("timeout waiting for hci reset, retrying")
+            await host.send_command(HCI_Reset_Command(), check_result=True)
+            host.ready = True
 
-        response = await host.send_command(
-            HCI_Read_Local_Version_Information_Command(), check_result=True
-        )
+        command = HCI_Read_Local_Version_Information_Command()
+        response = await host.send_command(command, check_result=True)
+        if response.command_opcode != command.op_code:
+            logger.error("failed to probe local version information")
+            return None
+
         local_version = response.return_parameters
 
         logger.debug(
diff --git a/bumble/host.py b/bumble/host.py
index 8085d5c..a3d3dad 100644
--- a/bumble/host.py
+++ b/bumble/host.py
@@ -171,7 +171,7 @@
         self.cis_links = {}  # CIS links, by connection handle
         self.sco_links = {}  # SCO links, by connection handle
         self.pending_command = None
-        self.pending_response = None
+        self.pending_response: Optional[asyncio.Future[Any]] = None
         self.number_of_supported_advertising_sets = 0
         self.maximum_advertising_data_length = 31
         self.local_version = None
@@ -514,7 +514,9 @@
         if self.hci_sink:
             self.hci_sink.on_packet(bytes(packet))
 
-    async def send_command(self, command, check_result=False):
+    async def send_command(
+        self, command, check_result=False, response_timeout: Optional[int] = None
+    ):
         # Wait until we can send (only one pending command at a time)
         async with self.command_semaphore:
             assert self.pending_command is None
@@ -526,12 +528,13 @@
 
             try:
                 self.send_hci_packet(command)
-                response = await self.pending_response
+                await asyncio.wait_for(self.pending_response, timeout=response_timeout)
+                response = self.pending_response.result()
 
                 # Check the return parameters if required
                 if check_result:
                     if isinstance(response, hci.HCI_Command_Status_Event):
-                        status = response.status
+                        status = response.status  # type: ignore[attr-defined]
                     elif isinstance(response.return_parameters, int):
                         status = response.return_parameters
                     elif isinstance(response.return_parameters, bytes):
@@ -625,14 +628,21 @@
 
     # Packet Sink protocol (packets coming from the controller via HCI)
     def on_packet(self, packet: bytes) -> None:
-        hci_packet = hci.HCI_Packet.from_bytes(packet)
+        try:
+            hci_packet = hci.HCI_Packet.from_bytes(packet)
+        except Exception as error:
+            logger.warning(f'!!! error parsing packet from bytes: {error}')
+            return
+
         if self.ready or (
             isinstance(hci_packet, hci.HCI_Command_Complete_Event)
             and hci_packet.command_opcode == hci.HCI_RESET_COMMAND
         ):
             self.on_hci_packet(hci_packet)
         else:
-            logger.debug('reset not done, ignoring packet from controller')
+            logger.debug(
+                f'reset not done, ignoring packet from controller: {hci_packet}'
+            )
 
     def on_transport_lost(self):
         # Called by the source when the transport has been lost.
diff --git a/bumble/profiles/aics.py b/bumble/profiles/aics.py
new file mode 100644
index 0000000..8b7468f
--- /dev/null
+++ b/bumble/profiles/aics.py
@@ -0,0 +1,519 @@
+# Copyright 2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""LE Audio - Audio Input Control Service"""
+
+# -----------------------------------------------------------------------------
+# Imports
+# -----------------------------------------------------------------------------
+import logging
+import struct
+
+from dataclasses import dataclass
+from typing import Optional
+
+from bumble import gatt
+from bumble.device import Connection
+from bumble.att import ATT_Error
+from bumble.gatt import (
+    Characteristic,
+    DelegatedCharacteristicAdapter,
+    TemplateService,
+    CharacteristicValue,
+    PackedCharacteristicAdapter,
+    GATT_AUDIO_INPUT_CONTROL_SERVICE,
+    GATT_AUDIO_INPUT_STATE_CHARACTERISTIC,
+    GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC,
+    GATT_AUDIO_INPUT_TYPE_CHARACTERISTIC,
+    GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC,
+    GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC,
+    GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC,
+)
+from bumble.gatt_client import ProfileServiceProxy, ServiceProxy
+from bumble.utils import OpenIntEnum
+
+# -----------------------------------------------------------------------------
+# Logging
+# -----------------------------------------------------------------------------
+logger = logging.getLogger(__name__)
+
+
+# -----------------------------------------------------------------------------
+# Constants
+# -----------------------------------------------------------------------------
+CHANGE_COUNTER_MAX_VALUE = 0xFF
+GAIN_SETTINGS_MIN_VALUE = 0
+GAIN_SETTINGS_MAX_VALUE = 255
+
+
+class ErrorCode(OpenIntEnum):
+    '''
+    Cf. 1.6 Application error codes
+    '''
+
+    INVALID_CHANGE_COUNTER = 0x80
+    OPCODE_NOT_SUPPORTED = 0x81
+    MUTE_DISABLED = 0x82
+    VALUE_OUT_OF_RANGE = 0x83
+    GAIN_MODE_CHANGE_NOT_ALLOWED = 0x84
+
+
+class Mute(OpenIntEnum):
+    '''
+    Cf. 2.2.1.2 Mute Field
+    '''
+
+    NOT_MUTED = 0x00
+    MUTED = 0x01
+    DISABLED = 0x02
+
+
+class GainMode(OpenIntEnum):
+    '''
+    Cf. 2.2.1.3 Gain Mode
+    '''
+
+    MANUAL_ONLY = 0x00
+    AUTOMATIC_ONLY = 0x01
+    MANUAL = 0x02
+    AUTOMATIC = 0x03
+
+
+class AudioInputStatus(OpenIntEnum):
+    '''
+    Cf. 3.4 Audio Input Status
+    '''
+
+    INATIVE = 0x00
+    ACTIVE = 0x01
+
+
+class AudioInputControlPointOpCode(OpenIntEnum):
+    '''
+    Cf. 3.5.1 Audio Input Control Point procedure requirements
+    '''
+
+    SET_GAIN_SETTING = 0x00
+    UNMUTE = 0x02
+    MUTE = 0x03
+    SET_MANUAL_GAIN_MODE = 0x04
+    SET_AUTOMATIC_GAIN_MODE = 0x05
+
+
+# -----------------------------------------------------------------------------
+@dataclass
+class AudioInputState:
+    '''
+    Cf. 2.2.1 Audio Input State
+    '''
+
+    gain_settings: int = 0
+    mute: Mute = Mute.NOT_MUTED
+    gain_mode: GainMode = GainMode.MANUAL
+    change_counter: int = 0
+    attribute_value: Optional[CharacteristicValue] = None
+
+    def __bytes__(self) -> bytes:
+        return bytes(
+            [self.gain_settings, self.mute, self.gain_mode, self.change_counter]
+        )
+
+    @classmethod
+    def from_bytes(cls, data: bytes):
+        gain_settings, mute, gain_mode, change_counter = struct.unpack("BBBB", data)
+        return cls(gain_settings, mute, gain_mode, change_counter)
+
+    def update_gain_settings_unit(self, gain_settings_unit: int) -> None:
+        self.gain_settings_unit = gain_settings_unit
+
+    def increment_gain_settings(self, gain_settings_unit: int) -> None:
+        self.gain_settings += gain_settings_unit
+        self.increment_change_counter()
+
+    def decrement_gain_settings(self) -> None:
+        self.gain_settings -= self.gain_settings_unit
+        self.increment_change_counter()
+
+    def increment_change_counter(self):
+        self.change_counter = (self.change_counter + 1) % (CHANGE_COUNTER_MAX_VALUE + 1)
+
+    async def notify_subscribers_via_connection(self, connection: Connection) -> None:
+        assert self.attribute_value is not None
+        await connection.device.notify_subscribers(
+            attribute=self.attribute_value, value=bytes(self)
+        )
+
+    def on_read(self, _connection: Optional[Connection]) -> bytes:
+        return bytes(self)
+
+
+@dataclass
+class GainSettingsProperties:
+    '''
+    Cf. 3.2 Gain Settings Properties
+    '''
+
+    gain_settings_unit: int = 1
+    gain_settings_minimum: int = GAIN_SETTINGS_MIN_VALUE
+    gain_settings_maximum: int = GAIN_SETTINGS_MAX_VALUE
+
+    @classmethod
+    def from_bytes(cls, data: bytes):
+        (gain_settings_unit, gain_settings_minimum, gain_settings_maximum) = (
+            struct.unpack('BBB', data)
+        )
+        GainSettingsProperties(
+            gain_settings_unit, gain_settings_minimum, gain_settings_maximum
+        )
+
+    def __bytes__(self) -> bytes:
+        return bytes(
+            [
+                self.gain_settings_unit,
+                self.gain_settings_minimum,
+                self.gain_settings_maximum,
+            ]
+        )
+
+    def on_read(self, _connection: Optional[Connection]) -> bytes:
+        return bytes(self)
+
+
+@dataclass
+class AudioInputControlPoint:
+    '''
+    Cf. 3.5.2 Audio Input Control Point
+    '''
+
+    audio_input_state: AudioInputState
+    gain_settings_properties: GainSettingsProperties
+
+    async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
+        assert connection
+
+        opcode = AudioInputControlPointOpCode(value[0])
+
+        if opcode == AudioInputControlPointOpCode.SET_GAIN_SETTING:
+            gain_settings_operand = value[2]
+            await self._set_gain_settings(connection, gain_settings_operand)
+        elif opcode == AudioInputControlPointOpCode.UNMUTE:
+            await self._unmute(connection)
+        elif opcode == AudioInputControlPointOpCode.MUTE:
+            change_counter_operand = value[1]
+            await self._mute(connection, change_counter_operand)
+        elif opcode == AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE:
+            await self._set_manual_gain_mode(connection)
+        elif opcode == AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE:
+            await self._set_automatic_gain_mode(connection)
+        else:
+            logger.error(f"OpCode value is incorrect: {opcode}")
+            raise ATT_Error(ErrorCode.OPCODE_NOT_SUPPORTED)
+
+    async def _set_gain_settings(
+        self, connection: Connection, gain_settings_operand: int
+    ) -> None:
+        '''Cf. 3.5.2.1 Set Gain Settings Procedure'''
+
+        gain_mode = self.audio_input_state.gain_mode
+
+        logger.error(f"set_gain_setting: gain_mode: {gain_mode}")
+        if not (gain_mode == GainMode.MANUAL or gain_mode == GainMode.MANUAL_ONLY):
+            logger.warning(
+                "GainMode should be either MANUAL or MANUAL_ONLY Cf Spec Audio Input Control Service 3.5.2.1"
+            )
+            return
+
+        if (
+            gain_settings_operand < self.gain_settings_properties.gain_settings_minimum
+            or gain_settings_operand
+            > self.gain_settings_properties.gain_settings_maximum
+        ):
+            logger.error("gain_seetings value out of range")
+            raise ATT_Error(ErrorCode.VALUE_OUT_OF_RANGE)
+
+        if self.audio_input_state.gain_settings != gain_settings_operand:
+            self.audio_input_state.gain_settings = gain_settings_operand
+            await self.audio_input_state.notify_subscribers_via_connection(connection)
+
+    async def _unmute(self, connection: Connection):
+        '''Cf. 3.5.2.2 Unmute procedure'''
+
+        logger.error(f'unmute: {self.audio_input_state.mute}')
+        mute = self.audio_input_state.mute
+        if mute == Mute.DISABLED:
+            logger.error("unmute: Cannot change Mute value, Mute state is DISABLED")
+            raise ATT_Error(ErrorCode.MUTE_DISABLED)
+
+        if mute == Mute.NOT_MUTED:
+            return
+
+        self.audio_input_state.mute = Mute.NOT_MUTED
+        self.audio_input_state.increment_change_counter()
+        await self.audio_input_state.notify_subscribers_via_connection(connection)
+
+    async def _mute(self, connection: Connection, change_counter_operand: int) -> None:
+        '''Cf. 3.5.5.2 Mute procedure'''
+
+        change_counter = self.audio_input_state.change_counter
+        mute = self.audio_input_state.mute
+        if mute == Mute.DISABLED:
+            logger.error("mute: Cannot change Mute value, Mute state is DISABLED")
+            raise ATT_Error(ErrorCode.MUTE_DISABLED)
+
+        if change_counter != change_counter_operand:
+            raise ATT_Error(ErrorCode.INVALID_CHANGE_COUNTER)
+
+        if mute == Mute.MUTED:
+            return
+
+        self.audio_input_state.mute = Mute.MUTED
+        self.audio_input_state.increment_change_counter()
+        await self.audio_input_state.notify_subscribers_via_connection(connection)
+
+    async def _set_manual_gain_mode(self, connection: Connection) -> None:
+        '''Cf. 3.5.2.4 Set Manual Gain Mode procedure'''
+
+        gain_mode = self.audio_input_state.gain_mode
+        if gain_mode in (GainMode.AUTOMATIC_ONLY, GainMode.MANUAL_ONLY):
+            logger.error(f"Cannot change gain_mode, bad state: {gain_mode}")
+            raise ATT_Error(ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED)
+
+        if gain_mode == GainMode.MANUAL:
+            return
+
+        self.audio_input_state.gain_mode = GainMode.MANUAL
+        self.audio_input_state.increment_change_counter()
+        await self.audio_input_state.notify_subscribers_via_connection(connection)
+
+    async def _set_automatic_gain_mode(self, connection: Connection) -> None:
+        '''Cf. 3.5.2.5 Set Automatic Gain Mode'''
+
+        gain_mode = self.audio_input_state.gain_mode
+        if gain_mode in (GainMode.AUTOMATIC_ONLY, GainMode.MANUAL_ONLY):
+            logger.error(f"Cannot change gain_mode, bad state: {gain_mode}")
+            raise ATT_Error(ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED)
+
+        if gain_mode == GainMode.AUTOMATIC:
+            return
+
+        self.audio_input_state.gain_mode = GainMode.AUTOMATIC
+        self.audio_input_state.increment_change_counter()
+        await self.audio_input_state.notify_subscribers_via_connection(connection)
+
+
+@dataclass
+class AudioInputDescription:
+    '''
+    Cf. 3.6 Audio Input Description
+    '''
+
+    audio_input_description: str = "Bluetooth"
+    attribute_value: Optional[CharacteristicValue] = None
+
+    @classmethod
+    def from_bytes(cls, data: bytes):
+        return cls(audio_input_description=data.decode('utf-8'))
+
+    def __bytes__(self) -> bytes:
+        return self.audio_input_description.encode('utf-8')
+
+    def on_read(self, _connection: Optional[Connection]) -> bytes:
+        return self.audio_input_description.encode('utf-8')
+
+    async def on_write(self, connection: Optional[Connection], value: bytes) -> None:
+        assert connection
+        assert self.attribute_value
+
+        self.audio_input_description = value.decode('utf-8')
+        await connection.device.notify_subscribers(
+            attribute=self.attribute_value, value=value
+        )
+
+
+class AICSService(TemplateService):
+    UUID = GATT_AUDIO_INPUT_CONTROL_SERVICE
+
+    def __init__(
+        self,
+        audio_input_state: Optional[AudioInputState] = None,
+        gain_settings_properties: Optional[GainSettingsProperties] = None,
+        audio_input_type: str = "local",
+        audio_input_status: Optional[AudioInputStatus] = None,
+        audio_input_description: Optional[AudioInputDescription] = None,
+    ):
+        self.audio_input_state = (
+            AudioInputState() if audio_input_state is None else audio_input_state
+        )
+        self.gain_settings_properties = (
+            GainSettingsProperties()
+            if gain_settings_properties is None
+            else gain_settings_properties
+        )
+        self.audio_input_status = (
+            AudioInputStatus.ACTIVE
+            if audio_input_status is None
+            else audio_input_status
+        )
+        self.audio_input_description = (
+            AudioInputDescription()
+            if audio_input_description is None
+            else audio_input_description
+        )
+
+        self.audio_input_control_point: AudioInputControlPoint = AudioInputControlPoint(
+            self.audio_input_state, self.gain_settings_properties
+        )
+
+        self.audio_input_state_characteristic = DelegatedCharacteristicAdapter(
+            Characteristic(
+                uuid=GATT_AUDIO_INPUT_STATE_CHARACTERISTIC,
+                properties=Characteristic.Properties.READ
+                | Characteristic.Properties.NOTIFY,
+                permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
+                value=CharacteristicValue(read=self.audio_input_state.on_read),
+            ),
+            encode=lambda value: bytes(value),
+        )
+        self.audio_input_state.attribute_value = (
+            self.audio_input_state_characteristic.value
+        )
+
+        self.gain_settings_properties_characteristic = DelegatedCharacteristicAdapter(
+            Characteristic(
+                uuid=GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC,
+                properties=Characteristic.Properties.READ,
+                permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
+                value=CharacteristicValue(read=self.gain_settings_properties.on_read),
+            )
+        )
+
+        self.audio_input_type_characteristic = Characteristic(
+            uuid=GATT_AUDIO_INPUT_TYPE_CHARACTERISTIC,
+            properties=Characteristic.Properties.READ,
+            permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
+            value=audio_input_type,
+        )
+
+        self.audio_input_status_characteristic = Characteristic(
+            uuid=GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC,
+            properties=Characteristic.Properties.READ,
+            permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION,
+            value=bytes([self.audio_input_status]),
+        )
+
+        self.audio_input_control_point_characteristic = DelegatedCharacteristicAdapter(
+            Characteristic(
+                uuid=GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC,
+                properties=Characteristic.Properties.WRITE,
+                permissions=Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
+                value=CharacteristicValue(
+                    write=self.audio_input_control_point.on_write
+                ),
+            )
+        )
+
+        self.audio_input_description_characteristic = DelegatedCharacteristicAdapter(
+            Characteristic(
+                uuid=GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC,
+                properties=Characteristic.Properties.READ
+                | Characteristic.Properties.NOTIFY
+                | Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
+                permissions=Characteristic.Permissions.READ_REQUIRES_ENCRYPTION
+                | Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION,
+                value=CharacteristicValue(
+                    write=self.audio_input_description.on_write,
+                    read=self.audio_input_description.on_read,
+                ),
+            )
+        )
+        self.audio_input_description.attribute_value = (
+            self.audio_input_control_point_characteristic.value
+        )
+
+        super().__init__(
+            [
+                self.audio_input_state_characteristic,  # type: ignore
+                self.gain_settings_properties_characteristic,  # type: ignore
+                self.audio_input_type_characteristic,  # type: ignore
+                self.audio_input_status_characteristic,  # type: ignore
+                self.audio_input_control_point_characteristic,  # type: ignore
+                self.audio_input_description_characteristic,  # type: ignore
+            ]
+        )
+
+
+# -----------------------------------------------------------------------------
+# Client
+# -----------------------------------------------------------------------------
+class AICSServiceProxy(ProfileServiceProxy):
+    SERVICE_CLASS = AICSService
+
+    def __init__(self, service_proxy: ServiceProxy) -> None:
+        self.service_proxy = service_proxy
+
+        if not (
+            characteristics := service_proxy.get_characteristics_by_uuid(
+                GATT_AUDIO_INPUT_STATE_CHARACTERISTIC
+            )
+        ):
+            raise gatt.InvalidServiceError("Audio Input State Characteristic not found")
+        self.audio_input_state = DelegatedCharacteristicAdapter(
+            characteristic=characteristics[0], decode=AudioInputState.from_bytes
+        )
+
+        if not (
+            characteristics := service_proxy.get_characteristics_by_uuid(
+                GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC
+            )
+        ):
+            raise gatt.InvalidServiceError(
+                "Gain Settings Attribute Characteristic not found"
+            )
+        self.gain_settings_properties = PackedCharacteristicAdapter(
+            characteristics[0],
+            'BBB',
+        )
+
+        if not (
+            characteristics := service_proxy.get_characteristics_by_uuid(
+                GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC
+            )
+        ):
+            raise gatt.InvalidServiceError(
+                "Audio Input Status Characteristic not found"
+            )
+        self.audio_input_status = PackedCharacteristicAdapter(
+            characteristics[0],
+            'B',
+        )
+
+        if not (
+            characteristics := service_proxy.get_characteristics_by_uuid(
+                GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC
+            )
+        ):
+            raise gatt.InvalidServiceError(
+                "Audio Input Control Point Characteristic not found"
+            )
+        self.audio_input_control_point = characteristics[0]
+
+        if not (
+            characteristics := service_proxy.get_characteristics_by_uuid(
+                GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC
+            )
+        ):
+            raise gatt.InvalidServiceError(
+                "Audio Input Description Characteristic not found"
+            )
+        self.audio_input_description = characteristics[0]
diff --git a/tests/aics_test.py b/tests/aics_test.py
new file mode 100644
index 0000000..8b47298
--- /dev/null
+++ b/tests/aics_test.py
@@ -0,0 +1,484 @@
+# Copyright 2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# -----------------------------------------------------------------------------
+# Imports
+# -----------------------------------------------------------------------------
+import pytest
+import pytest_asyncio
+
+from bumble import device
+
+from bumble.att import ATT_Error
+
+from bumble.profiles.aics import (
+    Mute,
+    AICSService,
+    AudioInputState,
+    AICSServiceProxy,
+    GainMode,
+    AudioInputStatus,
+    AudioInputControlPointOpCode,
+    GAIN_SETTINGS_MAX_VALUE,
+    GAIN_SETTINGS_MIN_VALUE,
+    ErrorCode,
+)
+
+from .test_utils import TwoDevices
+
+
+# -----------------------------------------------------------------------------
+# Tests
+# -----------------------------------------------------------------------------
+aics_service = AICSService()
+
+
+@pytest_asyncio.fixture
+async def aics_client():
+    devices = TwoDevices()
+    devices[0].add_service(aics_service)
+
+    await devices.setup_connection()
+
+    assert devices.connections[0] is not None
+    assert devices.connections[1] is not None
+
+    devices.connections[0].encryption = 1
+    devices.connections[1].encryption = 1
+
+    peer = device.Peer(devices.connections[1])
+    aics_client = await peer.discover_service_and_create_proxy(AICSServiceProxy)
+
+    yield aics_client
+
+
+# -----------------------------------------------------------------------------
[email protected]
+async def test_init_service(aics_client: AICSServiceProxy):
+    assert await aics_client.audio_input_state.read_value() == AudioInputState(
+        gain_settings=0,
+        mute=Mute.NOT_MUTED,
+        gain_mode=GainMode.MANUAL,
+        change_counter=0,
+    )
+    assert await aics_client.gain_settings_properties.read_value() == (1, 0, 255)
+    assert await aics_client.audio_input_status.read_value() == (
+        AudioInputStatus.ACTIVE
+    )
+
+
[email protected]
+async def test_wrong_opcode_raise_error(aics_client: AICSServiceProxy):
+    with pytest.raises(ATT_Error) as e:
+        await aics_client.audio_input_control_point.write_value(
+            bytes(
+                [
+                    0xFF,
+                ]
+            ),
+            with_response=True,
+        )
+
+    assert e.value.error_code == ErrorCode.OPCODE_NOT_SUPPORTED
+
+
[email protected]
+async def test_set_gain_setting_when_gain_mode_automatic_only(
+    aics_client: AICSServiceProxy,
+):
+    aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC_ONLY
+
+    change_counter = 0
+    gain_settings = 120
+    await aics_client.audio_input_control_point.write_value(
+        bytes(
+            [
+                AudioInputControlPointOpCode.SET_GAIN_SETTING,
+                change_counter,
+                gain_settings,
+            ]
+        )
+    )
+
+    # Unchanged
+    assert await aics_client.audio_input_state.read_value() == AudioInputState(
+        gain_settings=0,
+        mute=Mute.NOT_MUTED,
+        gain_mode=GainMode.AUTOMATIC_ONLY,
+        change_counter=0,
+    )
+
+
[email protected]
+async def test_set_gain_setting_when_gain_mode_automatic(aics_client: AICSServiceProxy):
+    aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC
+    change_counter = 0
+    gain_settings = 120
+    await aics_client.audio_input_control_point.write_value(
+        bytes(
+            [
+                AudioInputControlPointOpCode.SET_GAIN_SETTING,
+                change_counter,
+                gain_settings,
+            ]
+        )
+    )
+
+    # Unchanged
+    assert await aics_client.audio_input_state.read_value() == AudioInputState(
+        gain_settings=0,
+        mute=Mute.NOT_MUTED,
+        gain_mode=GainMode.AUTOMATIC,
+        change_counter=0,
+    )
+
+
[email protected]
+async def test_set_gain_setting_when_gain_mode_MANUAL(aics_client: AICSServiceProxy):
+    aics_service.audio_input_state.gain_mode = GainMode.MANUAL
+    change_counter = 0
+    gain_settings = 120
+    await aics_client.audio_input_control_point.write_value(
+        bytes(
+            [
+                AudioInputControlPointOpCode.SET_GAIN_SETTING,
+                change_counter,
+                gain_settings,
+            ]
+        )
+    )
+
+    assert await aics_client.audio_input_state.read_value() == AudioInputState(
+        gain_settings=gain_settings,
+        mute=Mute.NOT_MUTED,
+        gain_mode=GainMode.MANUAL,
+        change_counter=change_counter,
+    )
+
+
[email protected]
+async def test_set_gain_setting_when_gain_mode_MANUAL_ONLY(
+    aics_client: AICSServiceProxy,
+):
+    aics_service.audio_input_state.gain_mode = GainMode.MANUAL_ONLY
+    change_counter = 0
+    gain_settings = 120
+    await aics_client.audio_input_control_point.write_value(
+        bytes(
+            [
+                AudioInputControlPointOpCode.SET_GAIN_SETTING,
+                change_counter,
+                gain_settings,
+            ]
+        )
+    )
+
+    assert await aics_client.audio_input_state.read_value() == AudioInputState(
+        gain_settings=gain_settings,
+        mute=Mute.NOT_MUTED,
+        gain_mode=GainMode.MANUAL_ONLY,
+        change_counter=change_counter,
+    )
+
+
[email protected]
+async def test_unmute_when_muted(aics_client: AICSServiceProxy):
+    aics_service.audio_input_state.mute = Mute.MUTED
+    change_counter = 0
+    await aics_client.audio_input_control_point.write_value(
+        bytes(
+            [
+                AudioInputControlPointOpCode.UNMUTE,
+                change_counter,
+            ]
+        )
+    )
+
+    change_counter += 1
+
+    state: AudioInputState = await aics_client.audio_input_state.read_value()
+    assert state.mute == Mute.NOT_MUTED
+    assert state.change_counter == change_counter
+
+
[email protected]
+async def test_unmute_when_mute_disabled(aics_client: AICSServiceProxy):
+    aics_service.audio_input_state.mute = Mute.DISABLED
+    aics_service.audio_input_state.change_counter = 0
+    change_counter = 0
+
+    with pytest.raises(ATT_Error) as e:
+        await aics_client.audio_input_control_point.write_value(
+            bytes(
+                [
+                    AudioInputControlPointOpCode.UNMUTE,
+                    change_counter,
+                ]
+            ),
+            with_response=True,
+        )
+
+    assert e.value.error_code == ErrorCode.MUTE_DISABLED
+
+    state: AudioInputState = await aics_client.audio_input_state.read_value()
+    assert state.mute == Mute.DISABLED
+    assert state.change_counter == change_counter
+
+
[email protected]
+async def test_mute_when_not_muted(aics_client: AICSServiceProxy):
+    aics_service.audio_input_state.mute = Mute.NOT_MUTED
+    aics_service.audio_input_state.change_counter = 0
+    change_counter = 0
+
+    await aics_client.audio_input_control_point.write_value(
+        bytes(
+            [
+                AudioInputControlPointOpCode.MUTE,
+                change_counter,
+            ]
+        )
+    )
+
+    change_counter += 1
+    state: AudioInputState = await aics_client.audio_input_state.read_value()
+    assert state.mute == Mute.MUTED
+    assert state.change_counter == change_counter
+
+
[email protected]
+async def test_mute_when_mute_disabled(aics_client: AICSServiceProxy):
+    aics_service.audio_input_state.mute = Mute.DISABLED
+    aics_service.audio_input_state.change_counter = 0
+    change_counter = 0
+
+    with pytest.raises(ATT_Error) as e:
+        await aics_client.audio_input_control_point.write_value(
+            bytes(
+                [
+                    AudioInputControlPointOpCode.MUTE,
+                    change_counter,
+                ]
+            ),
+            with_response=True,
+        )
+
+    assert e.value.error_code == ErrorCode.MUTE_DISABLED
+
+    state: AudioInputState = await aics_client.audio_input_state.read_value()
+    assert state.mute == Mute.DISABLED
+    assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_manual_gain_mode_when_automatic(aics_client: AICSServiceProxy):
+    aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC
+    aics_service.audio_input_state.change_counter = 0
+    change_counter = 0
+
+    await aics_client.audio_input_control_point.write_value(
+        bytes(
+            [
+                AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE,
+                change_counter,
+            ]
+        )
+    )
+
+    change_counter += 1
+    state: AudioInputState = await aics_client.audio_input_state.read_value()
+    assert state.gain_mode == GainMode.MANUAL
+    assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_manual_gain_mode_when_already_manual(aics_client: AICSServiceProxy):
+    aics_service.audio_input_state.gain_mode = GainMode.MANUAL
+    aics_service.audio_input_state.change_counter = 0
+    change_counter = 0
+
+    await aics_client.audio_input_control_point.write_value(
+        bytes(
+            [
+                AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE,
+                change_counter,
+            ]
+        )
+    )
+
+    # No change expected
+    state: AudioInputState = await aics_client.audio_input_state.read_value()
+    assert state.gain_mode == GainMode.MANUAL
+    assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_manual_gain_mode_when_manual_only(aics_client: AICSServiceProxy):
+    aics_service.audio_input_state.gain_mode = GainMode.MANUAL_ONLY
+    aics_service.audio_input_state.change_counter = 0
+    change_counter = 0
+
+    with pytest.raises(ATT_Error) as e:
+        await aics_client.audio_input_control_point.write_value(
+            bytes(
+                [
+                    AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE,
+                    change_counter,
+                ]
+            ),
+            with_response=True,
+        )
+
+    assert e.value.error_code == ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED
+
+    state: AudioInputState = await aics_client.audio_input_state.read_value()
+    assert state.gain_mode == GainMode.MANUAL_ONLY
+    assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_manual_gain_mode_when_automatic_only(aics_client: AICSServiceProxy):
+    aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC_ONLY
+    aics_service.audio_input_state.change_counter = 0
+    change_counter = 0
+
+    with pytest.raises(ATT_Error) as e:
+        await aics_client.audio_input_control_point.write_value(
+            bytes(
+                [
+                    AudioInputControlPointOpCode.SET_MANUAL_GAIN_MODE,
+                    change_counter,
+                ]
+            ),
+            with_response=True,
+        )
+
+    assert e.value.error_code == ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED
+
+    # No change expected
+    state: AudioInputState = await aics_client.audio_input_state.read_value()
+    assert state.gain_mode == GainMode.AUTOMATIC_ONLY
+    assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_automatic_gain_mode_when_manual(aics_client: AICSServiceProxy):
+    aics_service.audio_input_state.gain_mode = GainMode.MANUAL
+    aics_service.audio_input_state.change_counter = 0
+    change_counter = 0
+
+    await aics_client.audio_input_control_point.write_value(
+        bytes(
+            [
+                AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE,
+                change_counter,
+            ]
+        )
+    )
+
+    change_counter += 1
+    state: AudioInputState = await aics_client.audio_input_state.read_value()
+    assert state.gain_mode == GainMode.AUTOMATIC
+    assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_automatic_gain_mode_when_already_automatic(
+    aics_client: AICSServiceProxy,
+):
+    aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC
+    aics_service.audio_input_state.change_counter = 0
+    change_counter = 0
+
+    await aics_client.audio_input_control_point.write_value(
+        bytes(
+            [
+                AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE,
+                change_counter,
+            ]
+        )
+    )
+
+    # No change expected
+    state: AudioInputState = await aics_client.audio_input_state.read_value()
+    assert state.gain_mode == GainMode.AUTOMATIC
+    assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_automatic_gain_mode_when_manual_only(aics_client: AICSServiceProxy):
+    aics_service.audio_input_state.gain_mode = GainMode.MANUAL_ONLY
+    aics_service.audio_input_state.change_counter = 0
+    change_counter = 0
+
+    with pytest.raises(ATT_Error) as e:
+        await aics_client.audio_input_control_point.write_value(
+            bytes(
+                [
+                    AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE,
+                    change_counter,
+                ]
+            ),
+            with_response=True,
+        )
+
+    assert e.value.error_code == ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED
+
+    # No change expected
+    state: AudioInputState = await aics_client.audio_input_state.read_value()
+    assert state.gain_mode == GainMode.MANUAL_ONLY
+    assert state.change_counter == change_counter
+
+
[email protected]
+async def test_set_automatic_gain_mode_when_automatic_only(
+    aics_client: AICSServiceProxy,
+):
+    aics_service.audio_input_state.gain_mode = GainMode.AUTOMATIC_ONLY
+    aics_service.audio_input_state.change_counter = 0
+    change_counter = 0
+
+    with pytest.raises(ATT_Error) as e:
+        await aics_client.audio_input_control_point.write_value(
+            bytes(
+                [
+                    AudioInputControlPointOpCode.SET_AUTOMATIC_GAIN_MODE,
+                    change_counter,
+                ]
+            ),
+            with_response=True,
+        )
+
+    assert e.value.error_code == ErrorCode.GAIN_MODE_CHANGE_NOT_ALLOWED
+
+    # No change expected
+    state: AudioInputState = await aics_client.audio_input_state.read_value()
+    assert state.gain_mode == GainMode.AUTOMATIC_ONLY
+    assert state.change_counter == change_counter
+
+
[email protected]
+async def test_audio_input_description_initial_value(aics_client: AICSServiceProxy):
+    description = await aics_client.audio_input_description.read_value()
+    assert description.decode('utf-8') == "Bluetooth"
+
+
[email protected]
+async def test_audio_input_description_write_and_read(aics_client: AICSServiceProxy):
+    new_description = "Line Input".encode('utf-8')
+
+    await aics_client.audio_input_description.write_value(new_description)
+
+    description = await aics_client.audio_input_description.read_value()
+    assert description == new_description