hfp: Implement initiate SLC procedure for HFP-HF
diff --git a/bumble/at.py b/bumble/at.py
new file mode 100644
index 0000000..78a4b08
--- /dev/null
+++ b/bumble/at.py
@@ -0,0 +1,85 @@
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List, Union
+
+
+def tokenize_parameters(buffer: bytes) -> List[bytes]:
+ """Split input parameters into tokens.
+ Removes space characters outside of double quote blocks:
+ T-rec-V-25 - 5.2.1 Command line general format: "Space characters (IA5 2/0)
+ are ignored [..], unless they are embedded in numeric or string constants"
+ Raises ValueError in case of invalid input string."""
+
+ tokens = []
+ in_quotes = False
+ token = bytearray()
+ for b in buffer:
+ char = bytearray([b])
+
+ if in_quotes:
+ token.extend(char)
+ if char == b'\"':
+ in_quotes = False
+ tokens.append(token[1:-1])
+ token = bytearray()
+ else:
+ if char == b' ':
+ pass
+ elif char == b',' or char == b')':
+ tokens.append(token)
+ tokens.append(char)
+ token = bytearray()
+ elif char == b'(':
+ if len(token) > 0:
+ raise ValueError("open_paren following regular character")
+ tokens.append(char)
+ elif char == b'"':
+ if len(token) > 0:
+ raise ValueError("quote following regular character")
+ in_quotes = True
+ token.extend(char)
+ else:
+ token.extend(char)
+
+ tokens.append(token)
+ return [bytes(token) for token in tokens if len(token) > 0]
+
+
+def parse_parameters(buffer: bytes) -> List[Union[bytes, list]]:
+ """Parse the parameters using the comma and parenthesis separators.
+ Raises ValueError in case of invalid input string."""
+
+ tokens = tokenize_parameters(buffer)
+ accumulator: List[list] = [[]]
+ current: Union[bytes, list] = bytes()
+
+ for token in tokens:
+ if token == b',':
+ accumulator[-1].append(current)
+ current = bytes()
+ elif token == b'(':
+ accumulator.append([])
+ elif token == b')':
+ if len(accumulator) < 2:
+ raise ValueError("close_paren without matching open_paren")
+ accumulator[-1].append(current)
+ current = accumulator.pop()
+ else:
+ current = token
+
+ accumulator[-1].append(current)
+ if len(accumulator) > 1:
+ raise ValueError("missing close_paren")
+ return accumulator[0]
diff --git a/bumble/hfp.py b/bumble/hfp.py
index 9080a55..6d9e428 100644
--- a/bumble/hfp.py
+++ b/bumble/hfp.py
@@ -1,4 +1,4 @@
-# Copyright 2021-2022 Google LLC
+# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,11 +17,31 @@
# -----------------------------------------------------------------------------
import logging
import asyncio
-import collections
-from typing import Union
+import dataclasses
+import enum
+import traceback
+from typing import Dict, List, Union, Set
+from . import at
from . import rfcomm
-from .colors import color
+
+from bumble.core import (
+ ProtocolError,
+ BT_GENERIC_AUDIO_SERVICE,
+ BT_HANDSFREE_SERVICE,
+ BT_L2CAP_PROTOCOL_ID,
+ BT_RFCOMM_PROTOCOL_ID,
+)
+from bumble.sdp import (
+ DataElement,
+ ServiceAttribute,
+ SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
+ SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
+ SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
+ SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
+ SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
+)
+
# -----------------------------------------------------------------------------
# Logging
@@ -30,72 +50,700 @@
# -----------------------------------------------------------------------------
-# Protocol Support
+# Normative protocol definitions
# -----------------------------------------------------------------------------
+
+# HF supported features (AT+BRSF=) (normative).
+# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07
+# and 3GPP 27.007
+class HfFeature(enum.IntFlag):
+ EC_NR = 0x001 # Echo Cancel & Noise reduction
+ THREE_WAY_CALLING = 0x002
+ CLI_PRESENTATION_CAPABILITY = 0x004
+ VOICE_RECOGNITION_ACTIVATION = 0x008
+ REMOTE_VOLUME_CONTROL = 0x010
+ ENHANCED_CALL_STATUS = 0x020
+ ENHANCED_CALL_CONTROL = 0x040
+ CODEC_NEGOTIATION = 0x080
+ HF_INDICATORS = 0x100
+ ESCO_S4_SETTINGS_SUPPORTED = 0x200
+ ENHANCED_VOICE_RECOGNITION_STATUS = 0x400
+ VOICE_RECOGNITION_TEST = 0x800
+
+
+# AG supported features (+BRSF:) (normative).
+# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07
+# and 3GPP 27.007
+class AgFeature(enum.IntFlag):
+ THREE_WAY_CALLING = 0x001
+ EC_NR = 0x002 # Echo Cancel & Noise reduction
+ VOICE_RECOGNITION_FUNCTION = 0x004
+ IN_BAND_RING_TONE_CAPABILITY = 0x008
+ VOICE_TAG = 0x010 # Attach a number to voice tag
+ REJECT_CALL = 0x020 # Ability to reject a call
+ ENHANCED_CALL_STATUS = 0x040
+ ENHANCED_CALL_CONTROL = 0x080
+ EXTENDED_ERROR_RESULT_CODES = 0x100
+ CODEC_NEGOTIATION = 0x200
+ HF_INDICATORS = 0x400
+ ESCO_S4_SETTINGS_SUPPORTED = 0x800
+ ENHANCED_VOICE_RECOGNITION_STATUS = 0x1000
+ VOICE_RECOGNITION_TEST = 0x2000
+
+
+# Audio Codec IDs (normative).
+# Hands-Free Profile v1.8, 10 Appendix B
+class AudioCodec(enum.IntEnum):
+ CVSD = 0x01 # Support for CVSD audio codec
+ MSBC = 0x02 # Support for mSBC audio codec
+
+
+# HF Indicators (normative).
+# Bluetooth Assigned Numbers, 6.10.1 HF Indicators
+class HfIndicator(enum.IntEnum):
+ ENHANCED_SAFETY = 0x01 # Enhanced safety feature
+ BATTERY_LEVEL = 0x02 # Battery level feature
+
+
+# Call Hold supported operations (normative).
+# AT Commands Reference Guide, 3.5.2.3.12 +CHLD - Call Holding Services
+class CallHoldOperation(enum.IntEnum):
+ RELEASE_ALL_HELD_CALLS = 0 # Release all held calls
+ RELEASE_ALL_ACTIVE_CALLS = 1 # Release all active calls, accept other
+ HOLD_ALL_ACTIVE_CALLS = 2 # Place all active calls on hold, accept other
+ ADD_HELD_CALL = 3 # Adds a held call to conversation
+
+
+# Response Hold status (normative).
+# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07
+# and 3GPP 27.007
+class ResponseHoldStatus(enum.IntEnum):
+ INC_CALL_HELD = 0 # Put incoming call on hold
+ HELD_CALL_ACC = 1 # Accept a held incoming call
+ HELD_CALL_REJ = 2 # Reject a held incoming call
+
+
+# Values for the Call Setup AG indicator (normative).
+# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07
+# and 3GPP 27.007
+class CallSetupAgIndicator(enum.IntEnum):
+ NOT_IN_CALL_SETUP = 0
+ INCOMING_CALL_PROCESS = 1
+ OUTGOING_CALL_SETUP = 2
+ REMOTE_ALERTED = 3 # Remote party alerted in an outgoing call
+
+
+# Values for the Call Held AG indicator (normative).
+# Hands-Free Profile v1.8, 4.34.2, AT Capabilities Re-Used from GSM 07.07
+# and 3GPP 27.007
+class CallHeldAgIndicator(enum.IntEnum):
+ NO_CALLS_HELD = 0
+ # Call is placed on hold or active/held calls swapped
+ # (The AG has both an active AND a held call)
+ CALL_ON_HOLD_AND_ACTIVE_CALL = 1
+ CALL_ON_HOLD_NO_ACTIVE_CALL = 2 # Call on hold, no active call
+
+
+# Call Info direction (normative).
+# AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls
+class CallInfoDirection(enum.IntEnum):
+ MOBILE_ORIGINATED_CALL = 0
+ MOBILE_TERMINATED_CALL = 1
+
+
+# Call Info status (normative).
+# AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls
+class CallInfoStatus(enum.IntEnum):
+ ACTIVE = 0
+ HELD = 1
+ DIALING = 2
+ ALERTING = 3
+ INCOMING = 4
+ WAITING = 5
+
+
+# Call Info mode (normative).
+# AT Commands Reference Guide, 3.5.2.3.15 +CLCC - List Current Calls
+class CallInfoMode(enum.IntEnum):
+ VOICE = 0
+ DATA = 1
+ FAX = 2
+ UNKNOWN = 9
+
+
# -----------------------------------------------------------------------------
-class HfpProtocol:
+# Hands-Free Control Interoperability Requirements
+# -----------------------------------------------------------------------------
+
+# Response codes.
+RESPONSE_CODES = [
+ "+APLSIRI",
+ "+BAC",
+ "+BCC",
+ "+BCS",
+ "+BIA",
+ "+BIEV",
+ "+BIND",
+ "+BINP",
+ "+BLDN",
+ "+BRSF",
+ "+BTRH",
+ "+BVRA",
+ "+CCWA",
+ "+CHLD",
+ "+CHUP",
+ "+CIND",
+ "+CLCC",
+ "+CLIP",
+ "+CMEE",
+ "+CMER",
+ "+CNUM",
+ "+COPS",
+ "+IPHONEACCEV",
+ "+NREC",
+ "+VGM",
+ "+VGS",
+ "+VTS",
+ "+XAPL",
+ "A",
+ "D",
+]
+
+# Unsolicited responses and statuses.
+UNSOLICITED_CODES = [
+ "+APLSIRI",
+ "+BCS",
+ "+BIND",
+ "+BSIR",
+ "+BTRH",
+ "+BVRA",
+ "+CCWA",
+ "+CIEV",
+ "+CLIP",
+ "+VGM",
+ "+VGS",
+ "BLACKLISTED",
+ "BUSY",
+ "DELAYED",
+ "NO ANSWER",
+ "NO CARRIER",
+ "RING",
+]
+
+# Status codes
+STATUS_CODES = [
+ "+CME ERROR",
+ "BLACKLISTED",
+ "BUSY",
+ "DELAYED",
+ "ERROR",
+ "NO ANSWER",
+ "NO CARRIER",
+ "OK",
+]
+
+
[email protected]
+class Configuration:
+ supported_hf_features: List[HfFeature]
+ supported_hf_indicators: List[HfIndicator]
+ supported_audio_codecs: List[AudioCodec]
+
+
+class AtResponseType(enum.Enum):
+ """Indicate if a response is expected from an AT command, and if multiple
+ responses are accepted."""
+
+ NONE = 0
+ SINGLE = 1
+ MULTIPLE = 2
+
+
+class AtResponse:
+ code: str
+ parameters: list
+
+ def __init__(self, response: bytearray):
+ code_and_parameters = response.split(b':')
+ parameters = (
+ code_and_parameters[1] if len(code_and_parameters) > 1 else bytearray()
+ )
+ self.code = code_and_parameters[0].decode()
+ self.parameters = at.parse_parameters(parameters)
+
+
[email protected]
+class AgIndicatorState:
+ description: str
+ index: int
+ supported_values: Set[int]
+ current_status: int
+
+
[email protected]
+class HfIndicatorState:
+ supported: bool = False
+ enabled: bool = False
+
+
+class HfProtocol:
+ """Implementation for the Hands-Free side of the Hands-Free profile.
+ Reference specification Hands-Free Profile v1.8"""
+
+ supported_hf_features: int
+ supported_audio_codecs: List[AudioCodec]
+
+ supported_ag_features: int
+ supported_ag_call_hold_operations: List[CallHoldOperation]
+
+ ag_indicators: List[AgIndicatorState]
+ hf_indicators: Dict[HfIndicator, HfIndicatorState]
+
dlc: rfcomm.DLC
- buffer: str
- lines: collections.deque
- lines_available: asyncio.Event
+ command_lock: asyncio.Lock
+ response_queue: asyncio.Queue
+ unsolicited_queue: asyncio.Queue
+ read_buffer: bytearray
- def __init__(self, dlc: rfcomm.DLC) -> None:
+ def __init__(self, dlc: rfcomm.DLC, configuration: Configuration):
+ # Configure internal state.
self.dlc = dlc
- self.buffer = ''
- self.lines = collections.deque()
- self.lines_available = asyncio.Event()
+ self.command_lock = asyncio.Lock()
+ self.response_queue = asyncio.Queue()
+ self.unsolicited_queue = asyncio.Queue()
+ self.read_buffer = bytearray()
- dlc.sink = self.feed
+ # Build local features.
+ self.supported_hf_features = sum(configuration.supported_hf_features)
+ self.supported_audio_codecs = configuration.supported_audio_codecs
- def feed(self, data: Union[bytes, str]) -> None:
- # Convert the data to a string if needed
- if isinstance(data, bytes):
- data = data.decode('utf-8')
+ self.hf_indicators = {
+ indicator: HfIndicatorState()
+ for indicator in configuration.supported_hf_indicators
+ }
- logger.debug(f'<<< Data received: {data}')
+ # Clear remote features.
+ self.supported_ag_features = 0
+ self.supported_ag_call_hold_operations = []
+ self.ag_indicators = []
- # Add to the buffer and look for lines
- self.buffer += data
- while (separator := self.buffer.find('\r')) >= 0:
- line = self.buffer[:separator].strip()
- self.buffer = self.buffer[separator + 1 :]
- if len(line) > 0:
- self.on_line(line)
+ # Bind the AT reader to the RFCOMM channel.
+ self.dlc.sink = self._read_at
- def on_line(self, line: str) -> None:
- self.lines.append(line)
- self.lines_available.set()
+ def supports_hf_feature(self, feature: HfFeature) -> bool:
+ return (self.supported_hf_features & feature) != 0
- def send_command_line(self, line: str) -> None:
- logger.debug(color(f'>>> {line}', 'yellow'))
- self.dlc.write(line + '\r')
+ def supports_ag_feature(self, feature: AgFeature) -> bool:
+ return (self.supported_ag_features & feature) != 0
- def send_response_line(self, line: str) -> None:
- logger.debug(color(f'>>> {line}', 'yellow'))
- self.dlc.write('\r\n' + line + '\r\n')
+ # Read AT messages from the RFCOMM channel.
+ # Enqueue AT commands, responses, unsolicited responses to their
+ # respective queues, and set the corresponding event.
+ def _read_at(self, data: bytes):
+ # Append to the read buffer.
+ self.read_buffer.extend(data)
- async def next_line(self) -> str:
- await self.lines_available.wait()
- line = self.lines.popleft()
- if not self.lines:
- self.lines_available.clear()
- logger.debug(color(f'<<< {line}', 'green'))
- return line
+ # Locate header and trailer.
+ header = self.read_buffer.find(b'\r\n')
+ trailer = self.read_buffer.find(b'\r\n', header + 2)
+ if header == -1 or trailer == -1:
+ return
- async def initialize_service(self) -> None:
- # Perform Service Level Connection Initialization
- self.send_command_line('AT+BRSF=2072') # Retrieve Supported Features
- await (self.next_line())
- await (self.next_line())
+ # Isolate the AT response code and parameters.
+ raw_response = self.read_buffer[header + 2 : trailer]
+ response = AtResponse(raw_response)
+ logger.debug(f"<<< {raw_response.decode()}")
- self.send_command_line('AT+CIND=?')
- await (self.next_line())
- await (self.next_line())
+ # Consume the response bytes.
+ self.read_buffer = self.read_buffer[trailer + 2 :]
- self.send_command_line('AT+CIND?')
- await (self.next_line())
- await (self.next_line())
+ # Forward the received code to the correct queue.
+ if self.command_lock.locked() and (
+ response.code in STATUS_CODES or response.code in RESPONSE_CODES
+ ):
+ self.response_queue.put_nowait(response)
+ elif response.code in UNSOLICITED_CODES:
+ self.unsolicited_queue.put_nowait(response)
+ else:
+ logger.warning(f"dropping unexpected response with code '{response.code}'")
- self.send_command_line('AT+CMER=3,0,0,1')
- await (self.next_line())
+ # Send an AT command and wait for the peer resposne.
+ # Wait for the AT responses sent by the peer, to the status code.
+ # Raises asyncio.TimeoutError if the status is not received
+ # after a timeout (default 1 second).
+ # Raises ProtocolError if the status is not OK.
+ async def execute_command(
+ self,
+ cmd: str,
+ timeout: float = 1.0,
+ response_type: AtResponseType = AtResponseType.NONE,
+ ) -> Union[None, AtResponse, List[AtResponse]]:
+ async with self.command_lock:
+ logger.debug(f">>> {cmd}")
+ self.dlc.write(cmd + '\r')
+ responses: List[AtResponse] = []
+
+ while True:
+ result = await asyncio.wait_for(
+ self.response_queue.get(), timeout=timeout
+ )
+ if result.code == 'OK':
+ if response_type == AtResponseType.SINGLE and len(responses) != 1:
+ raise ProtocolError("NO ANSWER")
+
+ if response_type == AtResponseType.MULTIPLE:
+ return responses
+ if response_type == AtResponseType.SINGLE:
+ return responses[0]
+ return None
+ if result.code in STATUS_CODES:
+ raise ProtocolError(result.code)
+ responses.append(result)
+
+ # 4.2.1 Service Level Connection Initialization.
+ async def initiate_slc(self):
+ # 4.2.1.1 Supported features exchange
+ # First, in the initialization procedure, the HF shall send the
+ # AT+BRSF=<HF supported features> command to the AG to both notify
+ # the AG of the supported features in the HF, as well as to retrieve the
+ # supported features in the AG using the +BRSF result code.
+ response = await self.execute_command(
+ f"AT+BRSF={self.supported_hf_features}", response_type=AtResponseType.SINGLE
+ )
+
+ self.supported_ag_features = int(response.parameters[0])
+ logger.info(f"supported AG features: {self.supported_ag_features}")
+ for feature in AgFeature:
+ if self.supports_ag_feature(feature):
+ logger.info(f" - {feature.name}")
+
+ # 4.2.1.2 Codec Negotiation
+ # Secondly, in the initialization procedure, if the HF supports the
+ # Codec Negotiation feature, it shall check if the AT+BRSF command
+ # response from the AG has indicated that it supports the Codec
+ # Negotiation feature.
+ if self.supports_hf_feature(
+ HfFeature.CODEC_NEGOTIATION
+ ) and self.supports_ag_feature(AgFeature.CODEC_NEGOTIATION):
+ # If both the HF and AG do support the Codec Negotiation feature
+ # then the HF shall send the AT+BAC=<HF available codecs> command to
+ # the AG to notify the AG of the available codecs in the HF.
+ codecs = [str(c) for c in self.supported_audio_codecs]
+ await self.execute_command(f"AT+BAC={','.join(codecs)}")
+
+ # 4.2.1.3 AG Indicators
+ # After having retrieved the supported features in the AG, the HF shall
+ # determine which indicators are supported by the AG, as well as the
+ # ordering of the supported indicators. This is because, according to
+ # the 3GPP 27.007 specification [2], the AG may support additional
+ # indicators not provided for by the Hands-Free Profile, and because the
+ # ordering of the indicators is implementation specific. The HF uses
+ # the AT+CIND=? Test command to retrieve information about the supported
+ # indicators and their ordering.
+ response = await self.execute_command(
+ "AT+CIND=?", response_type=AtResponseType.SINGLE
+ )
+
+ self.ag_indicators = []
+ for index, indicator in enumerate(response.parameters):
+ description = indicator[0].decode()
+ supported_values = []
+ for value in indicator[1]:
+ value = value.split(b'-')
+ value = [int(v) for v in value]
+ value_min = value[0]
+ value_max = value[1] if len(value) > 1 else value[0]
+ supported_values.extend([v for v in range(value_min, value_max + 1)])
+
+ self.ag_indicators.append(
+ AgIndicatorState(description, index, set(supported_values), 0)
+ )
+
+ # Once the HF has the necessary supported indicator and ordering
+ # information, it shall retrieve the current status of the indicators
+ # in the AG using the AT+CIND? Read command.
+ response = await self.execute_command(
+ "AT+CIND?", response_type=AtResponseType.SINGLE
+ )
+
+ for index, indicator in enumerate(response.parameters):
+ self.ag_indicators[index].current_status = int(indicator)
+
+ # After having retrieved the status of the indicators in the AG, the HF
+ # shall then enable the "Indicators status update" function in the AG by
+ # issuing the AT+CMER command, to which the AG shall respond with OK.
+ await self.execute_command("AT+CMER=3,,,1")
+
+ if self.supports_hf_feature(
+ HfFeature.THREE_WAY_CALLING
+ ) and self.supports_ag_feature(HfFeature.THREE_WAY_CALLING):
+ # After the HF has enabled the “Indicators status update” function in
+ # the AG, and if the “Call waiting and 3-way calling” bit was set in the
+ # supported features bitmap by both the HF and the AG, the HF shall
+ # issue the AT+CHLD=? test command to retrieve the information about how
+ # the call hold and multiparty services are supported in the AG. The HF
+ # shall not issue the AT+CHLD=? test command in case either the HF or
+ # the AG does not support the "Three-way calling" feature.
+ response = await self.execute_command(
+ "AT+CHLD=?", response_type=AtResponseType.SINGLE
+ )
+
+ self.supported_ag_call_hold_operations = [
+ CallHoldOperation(int(operation))
+ for operation in response.parameters[0]
+ if not b'x' in operation
+ ]
+
+ # 4.2.1.4 HF Indicators
+ # If the HF supports the HF indicator feature, it shall check the +BRSF
+ # response to see if the AG also supports the HF Indicator feature.
+ if self.supports_hf_feature(
+ HfFeature.HF_INDICATORS
+ ) and self.supports_ag_feature(AgFeature.HF_INDICATORS):
+ # If both the HF and AG support the HF Indicator feature, then the HF
+ # shall send the AT+BIND=<HF supported HF indicators> command to the AG
+ # to notify the AG of the supported indicators’ assigned numbers in the
+ # HF. The AG shall respond with OK
+ indicators = [str(i) for i in self.hf_indicators.keys()]
+ await self.execute_command(f"AT+BIND={','.join(indicators)}")
+
+ # After having provided the AG with the HF indicators it supports,
+ # the HF shall send the AT+BIND=? to request HF indicators supported
+ # by the AG. The AG shall reply with the +BIND response listing all
+ # HF indicators that it supports followed by an OK.
+ response = await self.execute_command(
+ "AT+BIND=?", response_type=AtResponseType.SINGLE
+ )
+
+ logger.info("supported HF indicators:")
+ for indicator in response.parameters[0]:
+ indicator = HfIndicator(int(indicator))
+ logger.info(f" - {indicator.name}")
+ if indicator in self.hf_indicators:
+ self.hf_indicators[indicator].supported = True
+
+ # Once the HF receives the supported HF indicators list from the AG,
+ # the HF shall send the AT+BIND? command to determine which HF
+ # indicators are enabled. The AG shall respond with one or more
+ # +BIND responses. The AG shall terminate the list with OK.
+ # (See Section 4.36.1.3).
+ responses = await self.execute_command(
+ "AT+BIND?", response_type=AtResponseType.MULTIPLE
+ )
+
+ logger.info("enabled HF indicators:")
+ for response in responses:
+ indicator = HfIndicator(int(response.parameters[0]))
+ enabled = int(response.parameters[1]) != 0
+ logger.info(f" - {indicator.name}: {enabled}")
+ if indicator in self.hf_indicators:
+ self.hf_indicators[indicator].enabled = True
+
+ logger.info("SLC setup completed")
+
+ # 4.11.2 Audio Connection Setup by HF
+ async def setup_audio_connection(self):
+ # When the HF triggers the establishment of the Codec Connection it
+ # shall send the AT command AT+BCC to the AG. The AG shall respond with
+ # OK if it will start the Codec Connection procedure, and with ERROR
+ # if it cannot start the Codec Connection procedure.
+ await self.execute_command("AT+BCC")
+
+ # 4.11.3 Codec Connection Setup
+ async def setup_codec_connection(self, codec_id: int):
+ # The AG shall send a +BCS=<Codec ID> unsolicited response to the HF.
+ # The HF shall then respond to the incoming unsolicited response with
+ # the AT command AT+BCS=<Codec ID>. The ID shall be the same as in the
+ # unsolicited response code as long as the ID is supported.
+ # If the received ID is not available, the HF shall respond with
+ # AT+BAC with its available codecs.
+ if codec_id not in self.supported_audio_codecs:
+ codecs = [str(c) for c in self.supported_audio_codecs]
+ await self.execute_command(f"AT+BAC={','.join(codecs)}")
+ return
+
+ await self.execute_command(f"AT+BCS={codec_id}")
+
+ # After sending the OK response, the AG shall open the
+ # Synchronous Connection with the settings that are determined by the
+ # ID. The HF shall be ready to accept the synchronous connection
+ # establishment as soon as it has sent the AT commands AT+BCS=<Codec ID>.
+
+ logger.info("codec connection setup completed")
+
+ # 4.13.1 Answer Incoming Call from the HF – In-Band Ringing
+ async def answer_incoming_call(self):
+ # The user accepts the incoming voice call by using the proper means
+ # provided by the HF. The HF shall then send the ATA command
+ # (see Section 4.34) to the AG. The AG shall then begin the procedure for
+ # accepting the incoming call.
+ await self.execute_command("ATA")
+
+ # 4.14.1 Reject an Incoming Call from the HF
+ async def reject_incoming_call(self):
+ # The user rejects the incoming call by using the User Interface on the
+ # Hands-Free unit. The HF shall then send the AT+CHUP command
+ # (see Section 4.34) to the AG. This may happen at any time during the
+ # procedures described in Sections 4.13.1 and 4.13.2.
+ await self.execute_command("AT+CHUP")
+
+ # 4.15.1 Terminate a Call Process from the HF
+ async def terminate_call(self):
+ # The user may abort the ongoing call process using whatever means
+ # provided by the Hands-Free unit. The HF shall send AT+CHUP command
+ # (see Section 4.34) to the AG, and the AG shall then start the
+ # procedure to terminate or interrupt the current call procedure.
+ # The AG shall then send the OK indication followed by the +CIEV result
+ # code, with the value indicating (call=0).
+ await self.execute_command("AT+CHUP")
+
+ async def update_ag_indicator(self, index: int, value: int):
+ self.ag_indicators[index].current_status = value
+ logger.info(
+ f"AG indicator updated: {self.ag_indicators[index].description}, {value}"
+ )
+
+ async def handle_unsolicited(self):
+ """Handle unsolicited result codes sent by the audio gateway."""
+ result = await self.unsolicited_queue.get()
+ if result.code == "+BCS":
+ await self.setup_codec_connection(int(result.parameters[0]))
+ elif result.code == "+CIEV":
+ await self.update_ag_indicator(
+ int(result.parameters[0]), int(result.parameters[1])
+ )
+ else:
+ logging.info(f"unhandled unsolicited response {result.code}")
+
+ async def run(self):
+ """Main rountine for the Hands-Free side of the HFP protocol.
+ Initiates the service level connection then loops handling
+ unsolicited AG responses."""
+
+ try:
+ await self.initiate_slc()
+ while True:
+ await self.handle_unsolicited()
+ except Exception:
+ logger.error("HFP-HF protocol failed with the following error:")
+ logger.error(traceback.format_exc())
+
+
+# -----------------------------------------------------------------------------
+# Normative SDP definitions
+# -----------------------------------------------------------------------------
+
+
+# Profile version (normative).
+# Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements
+class ProfileVersion(enum.IntEnum):
+ V1_5 = 0x0105
+ V1_6 = 0x0106
+ V1_7 = 0x0107
+ V1_8 = 0x0108
+ V1_9 = 0x0109
+
+
+# HF supported features (normative).
+# Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements
+class HfSdpFeature(enum.IntFlag):
+ EC_NR = 0x01 # Echo Cancel & Noise reduction
+ THREE_WAY_CALLING = 0x02
+ CLI_PRESENTATION_CAPABILITY = 0x04
+ VOICE_RECOGNITION_ACTIVATION = 0x08
+ REMOTE_VOLUME_CONTROL = 0x10
+ WIDE_BAND = 0x20 # Wide band speech
+ ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
+ VOICE_RECOGNITION_TEST = 0x80
+
+
+# AG supported features (normative).
+# Hands-Free Profile v1.8, 5.3 SDP Interoperability Requirements
+class AgSdpFeature(enum.IntFlag):
+ THREE_WAY_CALLING = 0x01
+ EC_NR = 0x02 # Echo Cancel & Noise reduction
+ VOICE_RECOGNITION_FUNCTION = 0x04
+ IN_BAND_RING_TONE_CAPABILITY = 0x08
+ VOICE_TAG = 0x10 # Attach a number to voice tag
+ WIDE_BAND = 0x20 # Wide band speech
+ ENHANCED_VOICE_RECOGNITION_STATUS = 0x40
+ VOICE_RECOGNITION_TEST = 0x80
+
+
+def sdp_records(
+ service_record_handle: int, rfcomm_channel: int, configuration: Configuration
+) -> List[ServiceAttribute]:
+ """Generate the SDP record for HFP Hands-Free support.
+ The record exposes the features supported in the input configuration,
+ and the allocated RFCOMM channel."""
+
+ hf_supported_features = 0
+
+ if HfFeature.EC_NR in configuration.supported_hf_features:
+ hf_supported_features |= HfSdpFeature.EC_NR
+ if HfFeature.THREE_WAY_CALLING in configuration.supported_hf_features:
+ hf_supported_features |= HfSdpFeature.THREE_WAY_CALLING
+ if HfFeature.CLI_PRESENTATION_CAPABILITY in configuration.supported_hf_features:
+ hf_supported_features |= HfSdpFeature.CLI_PRESENTATION_CAPABILITY
+ if HfFeature.VOICE_RECOGNITION_ACTIVATION in configuration.supported_hf_features:
+ hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_ACTIVATION
+ if HfFeature.REMOTE_VOLUME_CONTROL in configuration.supported_hf_features:
+ hf_supported_features |= HfSdpFeature.REMOTE_VOLUME_CONTROL
+ if (
+ HfFeature.ENHANCED_VOICE_RECOGNITION_STATUS
+ in configuration.supported_hf_features
+ ):
+ hf_supported_features |= HfSdpFeature.ENHANCED_VOICE_RECOGNITION_STATUS
+ if HfFeature.VOICE_RECOGNITION_TEST in configuration.supported_hf_features:
+ hf_supported_features |= HfSdpFeature.VOICE_RECOGNITION_TEST
+
+ if AudioCodec.MSBC in configuration.supported_audio_codecs:
+ hf_supported_features |= HfSdpFeature.WIDE_BAND
+
+ return [
+ ServiceAttribute(
+ SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
+ DataElement.unsigned_integer_32(service_record_handle),
+ ),
+ ServiceAttribute(
+ SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
+ DataElement.sequence(
+ [
+ DataElement.uuid(BT_HANDSFREE_SERVICE),
+ DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
+ ]
+ ),
+ ),
+ ServiceAttribute(
+ SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
+ DataElement.sequence(
+ [
+ DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]),
+ DataElement.sequence(
+ [
+ DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
+ DataElement.unsigned_integer_8(rfcomm_channel),
+ ]
+ ),
+ ]
+ ),
+ ),
+ ServiceAttribute(
+ SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
+ DataElement.sequence(
+ [
+ DataElement.sequence(
+ [
+ DataElement.uuid(BT_HANDSFREE_SERVICE),
+ DataElement.unsigned_integer_16(ProfileVersion.V1_8),
+ ]
+ )
+ ]
+ ),
+ ),
+ ServiceAttribute(
+ SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
+ DataElement.unsigned_integer_16(hf_supported_features),
+ ),
+ ]
diff --git a/bumble/sdp.py b/bumble/sdp.py
index 019b8e6..1d4faf9 100644
--- a/bumble/sdp.py
+++ b/bumble/sdp.py
@@ -94,6 +94,10 @@
SDP_ICON_URL_ATTRIBUTE_ID = 0X000C
SDP_ADDITIONAL_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID = 0X000D
+# Attribute Identifier (cf. Assigned Numbers for Service Discovery)
+# used by AVRCP, HFP and A2DP
+SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID = 0x0311
+
SDP_ATTRIBUTE_ID_NAMES = {
SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID: 'SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID',
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID: 'SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID',
diff --git a/examples/run_hfp_gateway.py b/examples/run_hfp_gateway.py
index 63a2a7c..eac5473 100644
--- a/examples/run_hfp_gateway.py
+++ b/examples/run_hfp_gateway.py
@@ -16,9 +16,11 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
+import collections
import sys
import os
import logging
+from typing import Union
from bumble.colors import color
@@ -30,6 +32,7 @@
BT_RFCOMM_PROTOCOL_ID,
BT_BR_EDR_TRANSPORT,
)
+from bumble import rfcomm
from bumble.rfcomm import Client
from bumble.sdp import (
Client as SDP_Client,
@@ -39,7 +42,64 @@
SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
)
-from bumble.hfp import HfpProtocol
+
+
+logger = logging.getLogger(__name__)
+
+
+# -----------------------------------------------------------------------------
+# Protocol Support
+# -----------------------------------------------------------------------------
+
+# -----------------------------------------------------------------------------
+class HfpProtocol:
+ dlc: rfcomm.DLC
+ buffer: str
+ lines: collections.deque
+ lines_available: asyncio.Event
+
+ def __init__(self, dlc: rfcomm.DLC) -> None:
+ self.dlc = dlc
+ self.buffer = ''
+ self.lines = collections.deque()
+ self.lines_available = asyncio.Event()
+
+ dlc.sink = self.feed
+
+ def feed(self, data: Union[bytes, str]) -> None:
+ # Convert the data to a string if needed
+ if isinstance(data, bytes):
+ data = data.decode('utf-8')
+
+ logger.debug(f'<<< Data received: {data}')
+
+ # Add to the buffer and look for lines
+ self.buffer += data
+ while (separator := self.buffer.find('\r')) >= 0:
+ line = self.buffer[:separator].strip()
+ self.buffer = self.buffer[separator + 1 :]
+ if len(line) > 0:
+ self.on_line(line)
+
+ def on_line(self, line: str) -> None:
+ self.lines.append(line)
+ self.lines_available.set()
+
+ def send_command_line(self, line: str) -> None:
+ logger.debug(color(f'>>> {line}', 'yellow'))
+ self.dlc.write(line + '\r')
+
+ def send_response_line(self, line: str) -> None:
+ logger.debug(color(f'>>> {line}', 'yellow'))
+ self.dlc.write('\r\n' + line + '\r\n')
+
+ async def next_line(self) -> str:
+ await self.lines_available.wait()
+ line = self.lines.popleft()
+ if not self.lines:
+ self.lines_available.clear()
+ logger.debug(color(f'<<< {line}', 'green'))
+ return line
# -----------------------------------------------------------------------------
diff --git a/examples/run_hfp_handsfree.py b/examples/run_hfp_handsfree.py
index cef29c0..5f747fc 100644
--- a/examples/run_hfp_handsfree.py
+++ b/examples/run_hfp_handsfree.py
@@ -21,82 +21,22 @@
import logging
import json
import websockets
-
+from typing import Optional
from bumble.device import Device
from bumble.transport import open_transport_or_link
from bumble.rfcomm import Server as RfcommServer
-from bumble.sdp import (
- DataElement,
- ServiceAttribute,
- SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
- SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
- SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
- SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
-)
-from bumble.core import (
- BT_GENERIC_AUDIO_SERVICE,
- BT_HANDSFREE_SERVICE,
- BT_L2CAP_PROTOCOL_ID,
- BT_RFCOMM_PROTOCOL_ID,
-)
-from bumble.hfp import HfpProtocol
-
-
-# -----------------------------------------------------------------------------
-def make_sdp_records(rfcomm_channel):
- return {
- 0x00010001: [
- ServiceAttribute(
- SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
- DataElement.unsigned_integer_32(0x00010001),
- ),
- ServiceAttribute(
- SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
- DataElement.sequence(
- [
- DataElement.uuid(BT_HANDSFREE_SERVICE),
- DataElement.uuid(BT_GENERIC_AUDIO_SERVICE),
- ]
- ),
- ),
- ServiceAttribute(
- SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
- DataElement.sequence(
- [
- DataElement.sequence([DataElement.uuid(BT_L2CAP_PROTOCOL_ID)]),
- DataElement.sequence(
- [
- DataElement.uuid(BT_RFCOMM_PROTOCOL_ID),
- DataElement.unsigned_integer_8(rfcomm_channel),
- ]
- ),
- ]
- ),
- ),
- ServiceAttribute(
- SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
- DataElement.sequence(
- [
- DataElement.sequence(
- [
- DataElement.uuid(BT_HANDSFREE_SERVICE),
- DataElement.unsigned_integer_16(0x0105),
- ]
- )
- ]
- ),
- ),
- ]
- }
+from bumble import hfp
+from bumble.hfp import HfProtocol
# -----------------------------------------------------------------------------
class UiServer:
- protocol = None
+ protocol: Optional[HfProtocol] = None
async def start(self):
- # Start a Websocket server to receive events from a web page
+ """Start a Websocket server to receive events from a web page."""
+
async def serve(websocket, _path):
while True:
try:
@@ -107,7 +47,7 @@
message_type = parsed['type']
if message_type == 'at_command':
if self.protocol is not None:
- self.protocol.send_command_line(parsed['command'])
+ await self.protocol.execute_command(parsed['command'])
except websockets.exceptions.ConnectionClosedOK:
pass
@@ -117,19 +57,11 @@
# -----------------------------------------------------------------------------
-async def protocol_loop(protocol):
- await protocol.initialize_service()
-
- while True:
- await (protocol.next_line())
-
-
-# -----------------------------------------------------------------------------
-def on_dlc(dlc):
+def on_dlc(dlc, configuration: hfp.Configuration):
print('*** DLC connected', dlc)
- protocol = HfpProtocol(dlc)
+ protocol = HfProtocol(dlc, configuration)
UiServer.protocol = protocol
- asyncio.create_task(protocol_loop(protocol))
+ asyncio.create_task(protocol.run())
# -----------------------------------------------------------------------------
@@ -143,6 +75,27 @@
async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
print('<<< connected')
+ # Hands-Free profile configuration.
+ # TODO: load configuration from file.
+ configuration = hfp.Configuration(
+ supported_hf_features=[
+ hfp.HfFeature.THREE_WAY_CALLING,
+ hfp.HfFeature.REMOTE_VOLUME_CONTROL,
+ hfp.HfFeature.ENHANCED_CALL_STATUS,
+ hfp.HfFeature.ENHANCED_CALL_CONTROL,
+ hfp.HfFeature.CODEC_NEGOTIATION,
+ hfp.HfFeature.HF_INDICATORS,
+ hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED,
+ ],
+ supported_hf_indicators=[
+ hfp.HfIndicator.BATTERY_LEVEL,
+ ],
+ supported_audio_codecs=[
+ hfp.AudioCodec.CVSD,
+ hfp.AudioCodec.MSBC,
+ ],
+ )
+
# Create a device
device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
device.classic_enabled = True
@@ -151,11 +104,13 @@
rfcomm_server = RfcommServer(device)
# Listen for incoming DLC connections
- channel_number = rfcomm_server.listen(on_dlc)
+ channel_number = rfcomm_server.listen(lambda dlc: on_dlc(dlc, configuration))
print(f'### Listening for connection on channel {channel_number}')
# Advertise the HFP RFComm channel in the SDP
- device.sdp_service_records = make_sdp_records(channel_number)
+ device.sdp_service_records = {
+ 0x00010001: hfp.sdp_records(0x00010001, channel_number, configuration)
+ }
# Let's go!
await device.power_on()
diff --git a/tests/at_test.py b/tests/at_test.py
new file mode 100644
index 0000000..a0f00dd
--- /dev/null
+++ b/tests/at_test.py
@@ -0,0 +1,35 @@
+# Copyright 2023 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from bumble import at
+
+
+def test_tokenize_parameters():
+ assert at.tokenize_parameters(b'1, 2, 3') == [b'1', b',', b'2', b',', b'3']
+ assert at.tokenize_parameters(b'"1, 2, 3"') == [b'1, 2, 3']
+ assert at.tokenize_parameters(b'(1, "2, 3")') == [b'(', b'1', b',', b'2, 3', b')']
+
+
+def test_parse_parameters():
+ assert at.parse_parameters(b'1, 2, 3') == [b'1', b'2', b'3']
+ assert at.parse_parameters(b'1,, 3') == [b'1', b'', b'3']
+ assert at.parse_parameters(b'"1, 2, 3"') == [b'1, 2, 3']
+ assert at.parse_parameters(b'1, (2, (3))') == [b'1', [b'2', [b'3']]]
+ assert at.parse_parameters(b'1, (2, "3, 4"), 5') == [b'1', [b'2', b'3, 4'], b'5']
+
+
+# -----------------------------------------------------------------------------
+if __name__ == '__main__':
+ test_tokenize_parameters()
+ test_parse_parameters()