| # Copyright 2023 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # ----------------------------------------------------------------------------- |
| # Imports |
| # ----------------------------------------------------------------------------- |
| import collections.abc |
| import logging |
| import asyncio |
| import dataclasses |
| import enum |
| import traceback |
| import warnings |
| from typing import Dict, List, Union, Set, TYPE_CHECKING |
| |
| from . import at |
| from . import rfcomm |
| |
| from bumble.colors import color |
| from bumble.core import ( |
| ProtocolError, |
| BT_GENERIC_AUDIO_SERVICE, |
| BT_HANDSFREE_SERVICE, |
| BT_L2CAP_PROTOCOL_ID, |
| BT_RFCOMM_PROTOCOL_ID, |
| ) |
| from bumble.sdp import ( |
| DataElement, |
| ServiceAttribute, |
| SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, |
| SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, |
| SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, |
| SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, |
| SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| # Logging |
| # ----------------------------------------------------------------------------- |
| logger = logging.getLogger(__name__) |
| |
| # ----------------------------------------------------------------------------- |
| # Error |
| # ----------------------------------------------------------------------------- |
| |
| |
| class HfpProtocolError(ProtocolError): |
| def __init__(self, error_name: str = '', details: str = ''): |
| super().__init__(None, 'hfp', error_name, details) |
| |
| |
| # ----------------------------------------------------------------------------- |
| # Protocol Support |
| # ----------------------------------------------------------------------------- |
| |
| # ----------------------------------------------------------------------------- |
| class HfpProtocol: |
| dlc: rfcomm.DLC |
| buffer: str |
| lines: collections.deque |
| lines_available: asyncio.Event |
| |
| def __init__(self, dlc: rfcomm.DLC) -> None: |
| warnings.warn("See HfProtocol", DeprecationWarning) |
| self.dlc = dlc |
| self.buffer = '' |
| self.lines = collections.deque() |
| self.lines_available = asyncio.Event() |
| |
| dlc.sink = self.feed |
| |
| def feed(self, data: Union[bytes, str]) -> None: |
| # Convert the data to a string if needed |
| if isinstance(data, bytes): |
| data = data.decode('utf-8') |
| |
| logger.debug(f'<<< Data received: {data}') |
| |
| # Add to the buffer and look for lines |
| self.buffer += data |
| while (separator := self.buffer.find('\r')) >= 0: |
| line = self.buffer[:separator].strip() |
| self.buffer = self.buffer[separator + 1 :] |
| if len(line) > 0: |
| self.on_line(line) |
| |
| def on_line(self, line: str) -> None: |
| self.lines.append(line) |
| self.lines_available.set() |
| |
| def send_command_line(self, line: str) -> None: |
| logger.debug(color(f'>>> {line}', 'yellow')) |
| self.dlc.write(line + '\r') |
| |
| def send_response_line(self, line: str) -> None: |
| logger.debug(color(f'>>> {line}', 'yellow')) |
| self.dlc.write('\r\n' + line + '\r\n') |
| |
| async def next_line(self) -> str: |
| await self.lines_available.wait() |
| line = self.lines.popleft() |
| if not self.lines: |
| self.lines_available.clear() |
| logger.debug(color(f'<<< {line}', 'green')) |
| return line |
| |
| |
| # ----------------------------------------------------------------------------- |
| # Normative protocol definitions |
| # ----------------------------------------------------------------------------- |
| |
| |
| # HF supported features (AT+BRSF=) (normative). |
| # Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 |
| # and 3GPP 27.007 |
| class HfFeature(enum.IntFlag): |
| EC_NR = 0x001 # Echo Cancel & Noise reduction |
| THREE_WAY_CALLING = 0x002 |
| CLI_PRESENTATION_CAPABILITY = 0x004 |
| VOICE_RECOGNITION_ACTIVATION = 0x008 |
| REMOTE_VOLUME_CONTROL = 0x010 |
| ENHANCED_CALL_STATUS = 0x020 |
| ENHANCED_CALL_CONTROL = 0x040 |
| CODEC_NEGOTIATION = 0x080 |
| HF_INDICATORS = 0x100 |
| ESCO_S4_SETTINGS_SUPPORTED = 0x200 |
| ENHANCED_VOICE_RECOGNITION_STATUS = 0x400 |
| VOICE_RECOGNITION_TEST = 0x800 |
| |
| |
| # AG supported features (+BRSF:) (normative). |
| # Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 |
| # and 3GPP 27.007 |
| class AgFeature(enum.IntFlag): |
| THREE_WAY_CALLING = 0x001 |
| EC_NR = 0x002 # Echo Cancel & Noise reduction |
| VOICE_RECOGNITION_FUNCTION = 0x004 |
| IN_BAND_RING_TONE_CAPABILITY = 0x008 |
| VOICE_TAG = 0x010 # Attach a number to voice tag |
| REJECT_CALL = 0x020 # Ability to reject a call |
| ENHANCED_CALL_STATUS = 0x040 |
| ENHANCED_CALL_CONTROL = 0x080 |
| EXTENDED_ERROR_RESULT_CODES = 0x100 |
| CODEC_NEGOTIATION = 0x200 |
| HF_INDICATORS = 0x400 |
| ESCO_S4_SETTINGS_SUPPORTED = 0x800 |
| ENHANCED_VOICE_RECOGNITION_STATUS = 0x1000 |
| VOICE_RECOGNITION_TEST = 0x2000 |
| |
| |
| # Audio Codec IDs (normative). |
| # Hands-Free Profile v1.8, 10 Appendix B |
| class AudioCodec(enum.IntEnum): |
| CVSD = 0x01 # Support for CVSD audio codec |
| MSBC = 0x02 # Support for mSBC audio codec |
| |
| |
| # HF Indicators (normative). |
| # Bluetooth Assigned Numbers, 6.10.1 HF Indicators |
| class HfIndicator(enum.IntEnum): |
| ENHANCED_SAFETY = 0x01 # Enhanced safety feature |
| BATTERY_LEVEL = 0x02 # Battery level feature |
| |
| |
| # Call Hold supported operations (normative). |
| # AT Commands Reference Guide, 3.5.2.3.12 +CHLD - Call Holding Services |
| class CallHoldOperation(enum.IntEnum): |
| RELEASE_ALL_HELD_CALLS = 0 # Release all held calls |
| RELEASE_ALL_ACTIVE_CALLS = 1 # Release all active calls, accept other |
| HOLD_ALL_ACTIVE_CALLS = 2 # Place all active calls on hold, accept other |
| ADD_HELD_CALL = 3 # Adds a held call to conversation |
| |
| |
| # Response Hold status (normative). |
| # Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 |
| # and 3GPP 27.007 |
| class ResponseHoldStatus(enum.IntEnum): |
| INC_CALL_HELD = 0 # Put incoming call on hold |
| HELD_CALL_ACC = 1 # Accept a held incoming call |
| HELD_CALL_REJ = 2 # Reject a held incoming call |
| |
| |
| # Values for the Call Setup AG indicator (normative). |
| # Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 |
| # and 3GPP 27.007 |
| class CallSetupAgIndicator(enum.IntEnum): |
| NOT_IN_CALL_SETUP = 0 |
| INCOMING_CALL_PROCESS = 1 |
| OUTGOING_CALL_SETUP = 2 |
| REMOTE_ALERTED = 3 # Remote party alerted in an outgoing call |
| |
| |
| # Values for the Call Held AG indicator (normative). |
| # Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07 |
| # and 3GPP 27.007 |
| class CallHeldAgIndicator(enum.IntEnum): |
| NO_CALLS_HELD = 0 |
| # Call is placed on hold or active/held calls swapped |
| # (The AG has both an active AND a held call) |
| CALL_ON_HOLD_AND_ACTIVE_CALL = 1 |
| CALL_ON_HOLD_NO_ACTIVE_CALL = 2 # Call on hold, no active call |
| |
| |
| # Call Info direction (normative). |
| # AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls |
| class CallInfoDirection(enum.IntEnum): |
| MOBILE_ORIGINATED_CALL = 0 |
| MOBILE_TERMINATED_CALL = 1 |
| |
| |
| # Call Info status (normative). |
| # AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls |
| class CallInfoStatus(enum.IntEnum): |
| ACTIVE = 0 |
| HELD = 1 |
| DIALING = 2 |
| ALERTING = 3 |
| INCOMING = 4 |
| WAITING = 5 |
| |
| |
| # Call Info mode (normative). |
| # AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls |
| class CallInfoMode(enum.IntEnum): |
| VOICE = 0 |
| DATA = 1 |
| FAX = 2 |
| UNKNOWN = 9 |
| |
| |
| # ----------------------------------------------------------------------------- |
| # Hands-Free Control Interoperability Requirements |
| # ----------------------------------------------------------------------------- |
| |
| # Response codes. |
| RESPONSE_CODES = [ |
| "+APLSIRI", |
| "+BAC", |
| "+BCC", |
| "+BCS", |
| "+BIA", |
| "+BIEV", |
| "+BIND", |
| "+BINP", |
| "+BLDN", |
| "+BRSF", |
| "+BTRH", |
| "+BVRA", |
| "+CCWA", |
| "+CHLD", |
| "+CHUP", |
| "+CIND", |
| "+CLCC", |
| "+CLIP", |
| "+CMEE", |
| "+CMER", |
| "+CNUM", |
| "+COPS", |
| "+IPHONEACCEV", |
| "+NREC", |
| "+VGM", |
| "+VGS", |
| "+VTS", |
| "+XAPL", |
| "A", |
| "D", |
| ] |
| |
| # Unsolicited responses and statuses. |
| UNSOLICITED_CODES = [ |
| "+APLSIRI", |
| "+BCS", |
| "+BIND", |
| "+BSIR", |
| "+BTRH", |
| "+BVRA", |
| "+CCWA", |
| "+CIEV", |
| "+CLIP", |
| "+VGM", |
| "+VGS", |
| "BLACKLISTED", |
| "BUSY", |
| "DELAYED", |
| "NO ANSWER", |
| "NO CARRIER", |
| "RING", |
| ] |
| |
| # Status codes |
| STATUS_CODES = [ |
| "+CME ERROR", |
| "BLACKLISTED", |
| "BUSY", |
| "DELAYED", |
| "ERROR", |
| "NO ANSWER", |
| "NO CARRIER", |
| "OK", |
| ] |
| |
| |
| @dataclasses.dataclass |
| class Configuration: |
| supported_hf_features: List[HfFeature] |
| supported_hf_indicators: List[HfIndicator] |
| supported_audio_codecs: List[AudioCodec] |
| |
| |
| class AtResponseType(enum.Enum): |
| """Indicate if a response is expected from an AT command, and if multiple |
| responses are accepted.""" |
| |
| NONE = 0 |
| SINGLE = 1 |
| MULTIPLE = 2 |
| |
| |
| class AtResponse: |
| code: str |
| parameters: list |
| |
| def __init__(self, response: bytearray): |
| code_and_parameters = response.split(b':') |
| parameters = ( |
| code_and_parameters[1] if len(code_and_parameters) > 1 else bytearray() |
| ) |
| self.code = code_and_parameters[0].decode() |
| self.parameters = at.parse_parameters(parameters) |
| |
| |
| @dataclasses.dataclass |
| class AgIndicatorState: |
| description: str |
| index: int |
| supported_values: Set[int] |
| current_status: int |
| |
| |
| @dataclasses.dataclass |
| class HfIndicatorState: |
| supported: bool = False |
| enabled: bool = False |
| |
| |
| class HfProtocol: |
| """Implementation for the Hands-Free side of the Hands-Free profile. |
| Reference specification Hands-Free Profile v1.8""" |
| |
| supported_hf_features: int |
| supported_audio_codecs: List[AudioCodec] |
| |
| supported_ag_features: int |
| supported_ag_call_hold_operations: List[CallHoldOperation] |
| |
| ag_indicators: List[AgIndicatorState] |
| hf_indicators: Dict[HfIndicator, HfIndicatorState] |
| |
| dlc: rfcomm.DLC |
| command_lock: asyncio.Lock |
| if TYPE_CHECKING: |
| response_queue: asyncio.Queue[AtResponse] |
| unsolicited_queue: asyncio.Queue[AtResponse] |
| else: |
| response_queue: asyncio.Queue |
| unsolicited_queue: asyncio.Queue |
| read_buffer: bytearray |
| |
| def __init__(self, dlc: rfcomm.DLC, configuration: Configuration): |
| # Configure internal state. |
| self.dlc = dlc |
| self.command_lock = asyncio.Lock() |
| self.response_queue = asyncio.Queue() |
| self.unsolicited_queue = asyncio.Queue() |
| self.read_buffer = bytearray() |
| |
| # Build local features. |
| self.supported_hf_features = sum(configuration.supported_hf_features) |
| self.supported_audio_codecs = configuration.supported_audio_codecs |
| |
| self.hf_indicators = { |
| indicator: HfIndicatorState() |
| for indicator in configuration.supported_hf_indicators |
| } |
| |
| # Clear remote features. |
| self.supported_ag_features = 0 |
| self.supported_ag_call_hold_operations = [] |
| self.ag_indicators = [] |
| |
| # Bind the AT reader to the RFCOMM channel. |
| self.dlc.sink = self._read_at |
| |
| def supports_hf_feature(self, feature: HfFeature) -> bool: |
| return (self.supported_hf_features & feature) != 0 |
| |
| def supports_ag_feature(self, feature: AgFeature) -> bool: |
| return (self.supported_ag_features & feature) != 0 |
| |
| # Read AT messages from the RFCOMM channel. |
| # Enqueue AT commands, responses, unsolicited responses to their |
| # respective queues, and set the corresponding event. |
| def _read_at(self, data: bytes): |
| # Append to the read buffer. |
| self.read_buffer.extend(data) |
| |
| # Locate header and trailer. |
| header = self.read_buffer.find(b'\r\n') |
| trailer = self.read_buffer.find(b'\r\n', header + 2) |
| if header == -1 or trailer == -1: |
| return |
| |
| # Isolate the AT response code and parameters. |
| raw_response = self.read_buffer[header + 2 : trailer] |
| response = AtResponse(raw_response) |
| logger.debug(f"<<< {raw_response.decode()}") |
| |
| # Consume the response bytes. |
| self.read_buffer = self.read_buffer[trailer + 2 :] |
| |
| # Forward the received code to the correct queue. |
| if self.command_lock.locked() and ( |
| response.code in STATUS_CODES or response.code in RESPONSE_CODES |
| ): |
| self.response_queue.put_nowait(response) |
| elif response.code in UNSOLICITED_CODES: |
| self.unsolicited_queue.put_nowait(response) |
| else: |
| logger.warning(f"dropping unexpected response with code '{response.code}'") |
| |
| # Send an AT command and wait for the peer response. |
| # Wait for the AT responses sent by the peer, to the status code. |
| # Raises asyncio.TimeoutError if the status is not received |
| # after a timeout (default 1 second). |
| # Raises ProtocolError if the status is not OK. |
| async def execute_command( |
| self, |
| cmd: str, |
| timeout: float = 1.0, |
| response_type: AtResponseType = AtResponseType.NONE, |
| ) -> Union[None, AtResponse, List[AtResponse]]: |
| async with self.command_lock: |
| logger.debug(f">>> {cmd}") |
| self.dlc.write(cmd + '\r') |
| responses: List[AtResponse] = [] |
| |
| while True: |
| result = await asyncio.wait_for( |
| self.response_queue.get(), timeout=timeout |
| ) |
| if result.code == 'OK': |
| if response_type == AtResponseType.SINGLE and len(responses) != 1: |
| raise HfpProtocolError("NO ANSWER") |
| |
| if response_type == AtResponseType.MULTIPLE: |
| return responses |
| if response_type == AtResponseType.SINGLE: |
| return responses[0] |
| return None |
| if result.code in STATUS_CODES: |
| raise HfpProtocolError(result.code) |
| responses.append(result) |
| |
| # 4.2.1 Service Level Connection Initialization. |
| async def initiate_slc(self): |
| # 4.2.1.1 Supported features exchange |
| # First, in the initialization procedure, the HF shall send the |
| # AT+BRSF=<HF supported features> command to the AG to both notify |
| # the AG of the supported features in the HF, as well as to retrieve the |
| # supported features in the AG using the +BRSF result code. |
| response = await self.execute_command( |
| f"AT+BRSF={self.supported_hf_features}", response_type=AtResponseType.SINGLE |
| ) |
| |
| self.supported_ag_features = int(response.parameters[0]) |
| logger.info(f"supported AG features: {self.supported_ag_features}") |
| for feature in AgFeature: |
| if self.supports_ag_feature(feature): |
| logger.info(f" - {feature.name}") |
| |
| # 4.2.1.2 Codec Negotiation |
| # Secondly, in the initialization procedure, if the HF supports the |
| # Codec Negotiation feature, it shall check if the AT+BRSF command |
| # response from the AG has indicated that it supports the Codec |
| # Negotiation feature. |
| if self.supports_hf_feature( |
| HfFeature.CODEC_NEGOTIATION |
| ) and self.supports_ag_feature(AgFeature.CODEC_NEGOTIATION): |
| # If both the HF and AG do support the Codec Negotiation feature |
| # then the HF shall send the AT+BAC=<HF available codecs> command to |
| # the AG to notify the AG of the available codecs in the HF. |
| codecs = [str(c) for c in self.supported_audio_codecs] |
| await self.execute_command(f"AT+BAC={','.join(codecs)}") |
| |
| # 4.2.1.3 AG Indicators |
| # After having retrieved the supported features in the AG, the HF shall |
| # determine which indicators are supported by the AG, as well as the |
| # ordering of the supported indicators. This is because, according to |
| # the 3GPP 27.007 specification [2], the AG may support additional |
| # indicators not provided for by the Hands-Free Profile, and because the |
| # ordering of the indicators is implementation specific. The HF uses |
| # the AT+CIND=? Test command to retrieve information about the supported |
| # indicators and their ordering. |
| response = await self.execute_command( |
| "AT+CIND=?", response_type=AtResponseType.SINGLE |
| ) |
| |
| self.ag_indicators = [] |
| for index, indicator in enumerate(response.parameters): |
| description = indicator[0].decode() |
| supported_values = [] |
| for value in indicator[1]: |
| value = value.split(b'-') |
| value = [int(v) for v in value] |
| value_min = value[0] |
| value_max = value[1] if len(value) > 1 else value[0] |
| supported_values.extend([v for v in range(value_min, value_max + 1)]) |
| |
| self.ag_indicators.append( |
| AgIndicatorState(description, index, set(supported_values), 0) |
| ) |
| |
| # Once the HF has the necessary supported indicator and ordering |
| # information, it shall retrieve the current status of the indicators |
| # in the AG using the AT+CIND? Read command. |
| response = await self.execute_command( |
| "AT+CIND?", response_type=AtResponseType.SINGLE |
| ) |
| |
| for index, indicator in enumerate(response.parameters): |
| self.ag_indicators[index].current_status = int(indicator) |
| |
| # After having retrieved the status of the indicators in the AG, the HF |
| # shall then enable the "Indicators status update" function in the AG by |
| # issuing the AT+CMER command, to which the AG shall respond with OK. |
| await self.execute_command("AT+CMER=3,,,1") |
| |
| if self.supports_hf_feature( |
| HfFeature.THREE_WAY_CALLING |
| ) and self.supports_ag_feature(HfFeature.THREE_WAY_CALLING): |
| # After the HF has enabled the “Indicators status update” function in |
| # the AG, and if the “Call waiting and 3-way calling” bit was set in the |
| # supported features bitmap by both the HF and the AG, the HF shall |
| # issue the AT+CHLD=? test command to retrieve the information about how |
| # the call hold and multiparty services are supported in the AG. The HF |
| # shall not issue the AT+CHLD=? test command in case either the HF or |
| # the AG does not support the "Three-way calling" feature. |
| response = await self.execute_command( |
| "AT+CHLD=?", response_type=AtResponseType.SINGLE |
| ) |
| |
| self.supported_ag_call_hold_operations = [ |
| CallHoldOperation(int(operation)) |
| for operation in response.parameters[0] |
| if not b'x' in operation |
| ] |
| |
| # 4.2.1.4 HF Indicators |
| # If the HF supports the HF indicator feature, it shall check the +BRSF |
| # response to see if the AG also supports the HF Indicator feature. |
| if self.supports_hf_feature( |
| HfFeature.HF_INDICATORS |
| ) and self.supports_ag_feature(AgFeature.HF_INDICATORS): |
| # If both the HF and AG support the HF Indicator feature, then the HF |
| # shall send the AT+BIND=<HF supported HF indicators> command to the AG |
| # to notify the AG of the supported indicators’ assigned numbers in the |
| # HF. The AG shall respond with OK |
| indicators = [str(i) for i in self.hf_indicators.keys()] |
| await self.execute_command(f"AT+BIND={','.join(indicators)}") |
| |
| # After having provided the AG with the HF indicators it supports, |
| # the HF shall send the AT+BIND=? to request HF indicators supported |
| # by the AG. The AG shall reply with the +BIND response listing all |
| # HF indicators that it supports followed by an OK. |
| response = await self.execute_command( |
| "AT+BIND=?", response_type=AtResponseType.SINGLE |
| ) |
| |
| logger.info("supported HF indicators:") |
| for indicator in response.parameters[0]: |
| indicator = HfIndicator(int(indicator)) |
| logger.info(f" - {indicator.name}") |
| if indicator in self.hf_indicators: |
| self.hf_indicators[indicator].supported = True |
| |
| # Once the HF receives the supported HF indicators list from the AG, |
| # the HF shall send the AT+BIND? command to determine which HF |
| # indicators are enabled. The AG shall respond with one or more |
| # +BIND responses. The AG shall terminate the list with OK. |
| # (See Section 4.36.1.3). |
| responses = await self.execute_command( |
| "AT+BIND?", response_type=AtResponseType.MULTIPLE |
| ) |
| |
| logger.info("enabled HF indicators:") |
| for response in responses: |
| indicator = HfIndicator(int(response.parameters[0])) |
| enabled = int(response.parameters[1]) != 0 |
| logger.info(f" - {indicator.name}: {enabled}") |
| if indicator in self.hf_indicators: |
| self.hf_indicators[indicator].enabled = True |
| |
| logger.info("SLC setup completed") |
| |
| # 4.11.2 Audio Connection Setup by HF |
| async def setup_audio_connection(self): |
| # When the HF triggers the establishment of the Codec Connection it |
| # shall send the AT command AT+BCC to the AG. The AG shall respond with |
| # OK if it will start the Codec Connection procedure, and with ERROR |
| # if it cannot start the Codec Connection procedure. |
| await self.execute_command("AT+BCC") |
| |
| # 4.11.3 Codec Connection Setup |
| async def setup_codec_connection(self, codec_id: int): |
| # The AG shall send a +BCS=<Codec ID> unsolicited response to the HF. |
| # The HF shall then respond to the incoming unsolicited response with |
| # the AT command AT+BCS=<Codec ID>. The ID shall be the same as in the |
| # unsolicited response code as long as the ID is supported. |
| # If the received ID is not available, the HF shall respond with |
| # AT+BAC with its available codecs. |
| if codec_id not in self.supported_audio_codecs: |
| codecs = [str(c) for c in self.supported_audio_codecs] |
| await self.execute_command(f"AT+BAC={','.join(codecs)}") |
| return |
| |
| await self.execute_command(f"AT+BCS={codec_id}") |
| |
| # After sending the OK response, the AG shall open the |
| # Synchronous Connection with the settings that are determined by the |
| # ID. The HF shall be ready to accept the synchronous connection |
| # establishment as soon as it has sent the AT commands AT+BCS=<Codec ID>. |
| |
| logger.info("codec connection setup completed") |
| |
| # 4.13.1 Answer Incoming Call from the HF – In-Band Ringing |
| async def answer_incoming_call(self): |
| # The user accepts the incoming voice call by using the proper means |
| # provided by the HF. The HF shall then send the ATA command |
| # (see Section 4.34) to the AG. The AG shall then begin the procedure for |
| # accepting the incoming call. |
| await self.execute_command("ATA") |
| |
| # 4.14.1 Reject an Incoming Call from the HF |
| async def reject_incoming_call(self): |
| # The user rejects the incoming call by using the User Interface on the |
| # Hands-Free unit. The HF shall then send the AT+CHUP command |
| # (see Section 4.34) to the AG. This may happen at any time during the |
| # procedures described in Sections 4.13.1 and 4.13.2. |
| await self.execute_command("AT+CHUP") |
| |
| # 4.15.1 Terminate a Call Process from the HF |
| async def terminate_call(self): |
| # The user may abort the ongoing call process using whatever means |
| # provided by the Hands-Free unit. The HF shall send AT+CHUP command |
| # (see Section 4.34) to the AG, and the AG shall then start the |
| # procedure to terminate or interrupt the current call procedure. |
| # The AG shall then send the OK indication followed by the +CIEV result |
| # code, with the value indicating (call=0). |
| await self.execute_command("AT+CHUP") |
| |
| async def update_ag_indicator(self, index: int, value: int): |
| self.ag_indicators[index].current_status = value |
| logger.info( |
| f"AG indicator updated: {self.ag_indicators[index].description}, {value}" |
| ) |
| |
| async def handle_unsolicited(self): |
| """Handle unsolicited result codes sent by the audio gateway.""" |
| result = await self.unsolicited_queue.get() |
| if result.code == "+BCS": |
| await self.setup_codec_connection(int(result.parameters[0])) |
| elif result.code == "+CIEV": |
| await self.update_ag_indicator( |
| int(result.parameters[0]), int(result.parameters[1]) |
| ) |
| else: |
| logging.info(f"unhandled unsolicited response {result.code}") |
| |
| async def run(self): |
| """Main rountine for the Hands-Free side of the HFP protocol. |
| Initiates the service level connection then loops handling |
| unsolicited AG responses.""" |
| |
| try: |
| await self.initiate_slc() |
| while True: |
| await self.handle_unsolicited() |
| except Exception: |
| logger.error("HFP-HF protocol failed with the following error:") |
| logger.error(traceback.format_exc()) |
| |
| |
| # ----------------------------------------------------------------------------- |
| # Normative SDP definitions |
| # ----------------------------------------------------------------------------- |
| |
| |
| # Profile version (normative). |
| # Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements |
| class ProfileVersion(enum.IntEnum): |
| V1_5 = 0x0105 |
| V1_6 = 0x0106 |
| V1_7 = 0x0107 |
| V1_8 = 0x0108 |
| V1_9 = 0x0109 |
| |
| |
| # HF supported features (normative). |
| # Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements |
| class HfSdpFeature(enum.IntFlag): |
| EC_NR = 0x01 # Echo Cancel & Noise reduction |
| THREE_WAY_CALLING = 0x02 |
| CLI_PRESENTATION_CAPABILITY = 0x04 |
| VOICE_RECOGNITION_ACTIVATION = 0x08 |
| REMOTE_VOLUME_CONTROL = 0x10 |
| WIDE_BAND = 0x20 # Wide band speech |
| ENHANCED_VOICE_RECOGNITION_STATUS = 0x40 |
| VOICE_RECOGNITION_TEST = 0x80 |
| |
| |
| # AG supported features (normative). |
| # Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements |
| class AgSdpFeature(enum.IntFlag): |
| THREE_WAY_CALLING = 0x01 |
| EC_NR = 0x02 # Echo Cancel & Noise reduction |
| VOICE_RECOGNITION_FUNCTION = 0x04 |
| IN_BAND_RING_TONE_CAPABILITY = 0x08 |
| VOICE_TAG = 0x10 # Attach a number to voice tag |
| WIDE_BAND = 0x20 # Wide band speech |
| ENHANCED_VOICE_RECOGNITION_STATUS = 0x40 |
| VOICE_RECOGNITION_TEST = 0x80 |
| |
| |
| def sdp_records( |
| service_record_handle: int, rfcomm_channel: int, configuration: Configuration |
| ) -> List[ServiceAttribute]: |
| """Generate the SDP record for HFP Hands-Free support. |
| The record exposes the features supported in the input configuration, |
| and the allocated RFCOMM channel.""" |
| |
| hf_supported_features = 0 |
| |
| if HfFeature.EC_NR in configuration.supported_hf_features: |
| hf_supported_features |= HfSdpFeature.EC_NR |
| if HfFeature.THREE_WAY_CALLING in configuration.supported_hf_features: |
| hf_supported_features |= HfSdpFeature.THREE_WAY_CALLING |
| if HfFeature.CLI_PRESENTATION_CAPABILITY in configuration.supported_hf_features: |
| hf_supported_features |= HfSdpFeature.CLI_PRESENTATION_CAPABILITY |
| if HfFeature.VOICE_RECOGNITION_ACTIVATION in configuration.supported_hf_features: |
| hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_ACTIVATION |
| if HfFeature.REMOTE_VOLUME_CONTROL in configuration.supported_hf_features: |
| hf_supported_features |= HfSdpFeature.REMOTE_VOLUME_CONTROL |
| if ( |
| HfFeature.ENHANCED_VOICE_RECOGNITION_STATUS |
| in configuration.supported_hf_features |
| ): |
| hf_supported_features |= HfSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS |
| if HfFeature.VOICE_RECOGNITION_TEST in configuration.supported_hf_features: |
| hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEST |
| |
| if AudioCodec.MSBC in configuration.supported_audio_codecs: |
| hf_supported_features |= HfSdpFeature.WIDE_BAND |
| |
| return [ |
| ServiceAttribute( |
| SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, |
| DataElement.unsigned_integer_32(service_record_handle), |
| ), |
| ServiceAttribute( |
| SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, |
| DataElement.sequence( |
| [ |
| DataElement.uuid(BT_HANDSFREE_SERVICE), |
| DataElement.uuid(BT_GENERIC_AUDIO_SERVICE), |
| ] |
| ), |
| ), |
| ServiceAttribute( |
| SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, |
| DataElement.sequence( |
| [ |
| DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]), |
| DataElement.sequence( |
| [ |
| DataElement.uuid(BT_RFCOMM_PROTOCOL_ID), |
| DataElement.unsigned_integer_8(rfcomm_channel), |
| ] |
| ), |
| ] |
| ), |
| ), |
| ServiceAttribute( |
| SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, |
| DataElement.sequence( |
| [ |
| DataElement.sequence( |
| [ |
| DataElement.uuid(BT_HANDSFREE_SERVICE), |
| DataElement.unsigned_integer_16(ProfileVersion.V1_8), |
| ] |
| ) |
| ] |
| ), |
| ), |
| ServiceAttribute( |
| SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, |
| DataElement.unsigned_integer_16(hf_supported_features), |
| ), |
| ] |