| # Copyright 2021-2022 Google LLC |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # https://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # ----------------------------------------------------------------------------- |
| # Imports |
| # ----------------------------------------------------------------------------- |
| import asyncio |
| import logging |
| import os |
| import pytest |
| import pytest_asyncio |
| |
| from typing import Tuple, Optional |
| |
| from .test_utils import TwoDevices |
| from bumble import core |
| from bumble import hfp |
| from bumble import rfcomm |
| from bumble import hci |
| |
| |
| # ----------------------------------------------------------------------------- |
| # Logging |
| # ----------------------------------------------------------------------------- |
| logger = logging.getLogger(__name__) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def _default_hf_configuration() -> hfp.HfConfiguration: |
| return hfp.HfConfiguration( |
| supported_hf_features=[ |
| hfp.HfFeature.CODEC_NEGOTIATION, |
| hfp.HfFeature.ESCO_S4_SETTINGS_SUPPORTED, |
| hfp.HfFeature.HF_INDICATORS, |
| hfp.HfFeature.ENHANCED_CALL_STATUS, |
| hfp.HfFeature.THREE_WAY_CALLING, |
| hfp.HfFeature.CLI_PRESENTATION_CAPABILITY, |
| ], |
| supported_hf_indicators=[ |
| hfp.HfIndicator.ENHANCED_SAFETY, |
| hfp.HfIndicator.BATTERY_LEVEL, |
| ], |
| supported_audio_codecs=[ |
| hfp.AudioCodec.CVSD, |
| hfp.AudioCodec.MSBC, |
| ], |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def _default_hf_sdp_features() -> hfp.HfSdpFeature: |
| return ( |
| hfp.HfSdpFeature.WIDE_BAND |
| | hfp.HfSdpFeature.THREE_WAY_CALLING |
| | hfp.HfSdpFeature.CLI_PRESENTATION_CAPABILITY |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def _default_ag_configuration() -> hfp.AgConfiguration: |
| return hfp.AgConfiguration( |
| supported_ag_features=[ |
| hfp.AgFeature.HF_INDICATORS, |
| hfp.AgFeature.IN_BAND_RING_TONE_CAPABILITY, |
| hfp.AgFeature.REJECT_CALL, |
| hfp.AgFeature.CODEC_NEGOTIATION, |
| hfp.AgFeature.ESCO_S4_SETTINGS_SUPPORTED, |
| hfp.AgFeature.ENHANCED_CALL_STATUS, |
| hfp.AgFeature.THREE_WAY_CALLING, |
| ], |
| supported_ag_indicators=[ |
| hfp.AgIndicatorState.call(), |
| hfp.AgIndicatorState.service(), |
| hfp.AgIndicatorState.callsetup(), |
| hfp.AgIndicatorState.callsetup(), |
| hfp.AgIndicatorState.signal(), |
| hfp.AgIndicatorState.roam(), |
| hfp.AgIndicatorState.battchg(), |
| ], |
| supported_hf_indicators=[ |
| hfp.HfIndicator.ENHANCED_SAFETY, |
| hfp.HfIndicator.BATTERY_LEVEL, |
| ], |
| supported_ag_call_hold_operations=[ |
| hfp.CallHoldOperation.ADD_HELD_CALL, |
| hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS, |
| hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT, |
| hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS, |
| hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS, |
| hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL, |
| hfp.CallHoldOperation.CONNECT_TWO_CALLS, |
| ], |
| supported_audio_codecs=[hfp.AudioCodec.CVSD, hfp.AudioCodec.MSBC], |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| def _default_ag_sdp_features() -> hfp.AgSdpFeature: |
| return ( |
| hfp.AgSdpFeature.WIDE_BAND |
| | hfp.AgSdpFeature.IN_BAND_RING_TONE_CAPABILITY |
| | hfp.AgSdpFeature.THREE_WAY_CALLING |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def make_hfp_connections( |
| hf_config: Optional[hfp.HfConfiguration] = None, |
| ag_config: Optional[hfp.AgConfiguration] = None, |
| ): |
| if not hf_config: |
| hf_config = _default_hf_configuration() |
| if not ag_config: |
| ag_config = _default_ag_configuration() |
| |
| # Setup devices |
| devices = TwoDevices() |
| await devices.setup_connection() |
| |
| # Setup RFCOMM channel |
| wait_dlc = asyncio.get_running_loop().create_future() |
| rfcomm_channel = rfcomm.Server(devices.devices[0]).listen(wait_dlc.set_result) |
| assert devices.connections[0] |
| assert devices.connections[1] |
| client_mux = await rfcomm.Client(devices.connections[1]).start() |
| |
| client_dlc = await client_mux.open_dlc(rfcomm_channel) |
| server_dlc = await wait_dlc |
| |
| # Setup HFP connection |
| hf = hfp.HfProtocol(client_dlc, hf_config) |
| ag = hfp.AgProtocol(server_dlc, ag_config) |
| |
| await hf.initiate_slc() |
| return (hf, ag) |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest_asyncio.fixture |
| async def hfp_connections(): |
| hf, ag = await make_hfp_connections() |
| hf_loop_task = asyncio.create_task(hf.run()) |
| |
| try: |
| yield (hf, ag) |
| finally: |
| # Close the coroutine. |
| hf.unsolicited_queue.put_nowait(None) |
| await hf_loop_task |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_slc_with_minimal_features(): |
| hf, ag = await make_hfp_connections( |
| hfp.HfConfiguration( |
| supported_audio_codecs=[], |
| supported_hf_features=[], |
| supported_hf_indicators=[], |
| ), |
| hfp.AgConfiguration( |
| supported_ag_call_hold_operations=[], |
| supported_ag_features=[], |
| supported_ag_indicators=[ |
| hfp.AgIndicatorState( |
| indicator=hfp.AgIndicator.CALL, |
| supported_values={0, 1}, |
| current_status=0, |
| ) |
| ], |
| supported_hf_indicators=[], |
| supported_audio_codecs=[], |
| ), |
| ) |
| |
| assert hf.supported_ag_features == ag.supported_ag_features |
| assert hf.supported_hf_features == ag.supported_hf_features |
| assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations |
| for a, b in zip(hf.ag_indicators, ag.ag_indicators): |
| assert a.indicator == b.indicator |
| assert a.current_status == b.current_status |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_slc(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): |
| hf, ag = hfp_connections |
| |
| assert hf.supported_ag_features == ag.supported_ag_features |
| assert hf.supported_hf_features == ag.supported_hf_features |
| assert hf.supported_ag_call_hold_operations == ag.supported_ag_call_hold_operations |
| for a, b in zip(hf.ag_indicators, ag.ag_indicators): |
| assert a.indicator == b.indicator |
| assert a.current_status == b.current_status |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_ag_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): |
| hf, ag = hfp_connections |
| |
| future = asyncio.get_running_loop().create_future() |
| hf.on('ag_indicator', future.set_result) |
| |
| ag.update_ag_indicator(hfp.AgIndicator.CALL, 1) |
| |
| indicator: hfp.AgIndicatorState = await future |
| assert indicator.current_status == 1 |
| assert indicator.indicator == hfp.AgIndicator.CALL |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_hf_indicator(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): |
| hf, ag = hfp_connections |
| |
| future = asyncio.get_running_loop().create_future() |
| ag.on('hf_indicator', future.set_result) |
| |
| await hf.execute_command('AT+BIEV=2,100') |
| |
| indicator: hfp.HfIndicatorState = await future |
| assert indicator.current_status == 100 |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_codec_negotiation( |
| hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] |
| ): |
| hf, ag = hfp_connections |
| |
| futures = [ |
| asyncio.get_running_loop().create_future(), |
| asyncio.get_running_loop().create_future(), |
| ] |
| hf.on('codec_negotiation', futures[0].set_result) |
| ag.on('codec_negotiation', futures[1].set_result) |
| await ag.negotiate_codec(hfp.AudioCodec.MSBC) |
| |
| assert await futures[0] == await futures[1] |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_dial(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): |
| hf, ag = hfp_connections |
| NUMBER = 'ATD123456789' |
| |
| future = asyncio.get_running_loop().create_future() |
| ag.on('dial', future.set_result) |
| await hf.execute_command(f'ATD{NUMBER}') |
| |
| number: str = await future |
| assert number == NUMBER |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_answer(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): |
| hf, ag = hfp_connections |
| |
| future = asyncio.get_running_loop().create_future() |
| ag.on('answer', lambda: future.set_result(None)) |
| await hf.answer_incoming_call() |
| |
| await future |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_reject_incoming_call( |
| hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] |
| ): |
| hf, ag = hfp_connections |
| |
| future = asyncio.get_running_loop().create_future() |
| ag.on('hang_up', lambda: future.set_result(None)) |
| await hf.reject_incoming_call() |
| |
| await future |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_terminate_call(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): |
| hf, ag = hfp_connections |
| |
| future = asyncio.get_running_loop().create_future() |
| ag.on('hang_up', lambda: future.set_result(None)) |
| await hf.terminate_call() |
| |
| await future |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_query_calls_without_calls( |
| hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] |
| ): |
| hf, ag = hfp_connections |
| |
| assert await hf.query_current_calls() == [] |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_query_calls_with_calls( |
| hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] |
| ): |
| hf, ag = hfp_connections |
| ag.calls.append( |
| hfp.CallInfo( |
| index=1, |
| direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL, |
| status=hfp.CallInfoStatus.ACTIVE, |
| mode=hfp.CallInfoMode.VOICE, |
| multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE, |
| number='123456789', |
| ) |
| ) |
| |
| assert await hf.query_current_calls() == ag.calls |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| @pytest.mark.parametrize( |
| "operation,", |
| ( |
| hfp.CallHoldOperation.RELEASE_ALL_HELD_CALLS, |
| hfp.CallHoldOperation.RELEASE_ALL_ACTIVE_CALLS, |
| hfp.CallHoldOperation.HOLD_ALL_ACTIVE_CALLS, |
| hfp.CallHoldOperation.ADD_HELD_CALL, |
| hfp.CallHoldOperation.CONNECT_TWO_CALLS, |
| ), |
| ) |
| async def test_hold_call_without_call_index( |
| hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol], |
| operation: hfp.CallHoldOperation, |
| ): |
| hf, ag = hfp_connections |
| call_hold_future = asyncio.get_running_loop().create_future() |
| ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index))) |
| |
| await hf.execute_command(f"AT+CHLD={operation.value}") |
| |
| assert (await call_hold_future) == (operation, None) |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| @pytest.mark.parametrize( |
| "operation,", |
| ( |
| hfp.CallHoldOperation.RELEASE_SPECIFIC_CALL, |
| hfp.CallHoldOperation.HOLD_ALL_CALLS_EXCEPT, |
| ), |
| ) |
| async def test_hold_call_with_call_index( |
| hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol], |
| operation: hfp.CallHoldOperation, |
| ): |
| hf, ag = hfp_connections |
| call_hold_future = asyncio.get_running_loop().create_future() |
| ag.on("call_hold", lambda op, index: call_hold_future.set_result((op, index))) |
| ag.calls.append( |
| hfp.CallInfo( |
| index=1, |
| direction=hfp.CallInfoDirection.MOBILE_ORIGINATED_CALL, |
| status=hfp.CallInfoStatus.ACTIVE, |
| mode=hfp.CallInfoMode.VOICE, |
| multi_party=hfp.CallInfoMultiParty.NOT_IN_CONFERENCE, |
| number='123456789', |
| ) |
| ) |
| |
| await hf.execute_command(f"AT+CHLD={operation.value.replace('x', '1')}") |
| |
| assert (await call_hold_future) == (operation, 1) |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_ring(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): |
| hf, ag = hfp_connections |
| ring_future = asyncio.get_running_loop().create_future() |
| hf.on("ring", lambda: ring_future.set_result(None)) |
| |
| ag.send_ring() |
| |
| await ring_future |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_speaker_volume(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): |
| hf, ag = hfp_connections |
| speaker_volume_future = asyncio.get_running_loop().create_future() |
| hf.on("speaker_volume", speaker_volume_future.set_result) |
| |
| ag.set_speaker_volume(10) |
| |
| assert await speaker_volume_future == 10 |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_microphone_volume( |
| hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] |
| ): |
| hf, ag = hfp_connections |
| microphone_volume_future = asyncio.get_running_loop().create_future() |
| hf.on("microphone_volume", microphone_volume_future.set_result) |
| |
| ag.set_microphone_volume(10) |
| |
| assert await microphone_volume_future == 10 |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_cli_notification(hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol]): |
| hf, ag = hfp_connections |
| cli_notification_future = asyncio.get_running_loop().create_future() |
| hf.on("cli_notification", cli_notification_future.set_result) |
| |
| ag.send_cli_notification( |
| hfp.CallLineIdentification(number="\"123456789\"", type=129, alpha="\"Bumble\"") |
| ) |
| |
| assert await cli_notification_future == hfp.CallLineIdentification( |
| number="123456789", type=129, alpha="Bumble", subaddr="", satype=None |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_voice_recognition_from_hf( |
| hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] |
| ): |
| hf, ag = hfp_connections |
| voice_recognition_future = asyncio.get_running_loop().create_future() |
| ag.on("voice_recognition", voice_recognition_future.set_result) |
| |
| await hf.execute_command("AT+BVRA=1") |
| |
| assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_voice_recognition_from_ag( |
| hfp_connections: Tuple[hfp.HfProtocol, hfp.AgProtocol] |
| ): |
| hf, ag = hfp_connections |
| voice_recognition_future = asyncio.get_running_loop().create_future() |
| hf.on("voice_recognition", voice_recognition_future.set_result) |
| |
| ag.send_response("+BVRA: 1") |
| |
| assert await voice_recognition_future == hfp.VoiceRecognitionState.ENABLE |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_hf_sdp_record(): |
| devices = TwoDevices() |
| await devices.setup_connection() |
| |
| devices[0].sdp_service_records[1] = hfp.make_hf_sdp_records( |
| 1, 2, _default_hf_configuration(), hfp.ProfileVersion.V1_8 |
| ) |
| |
| assert await hfp.find_hf_sdp_record(devices.connections[1]) == ( |
| 2, |
| hfp.ProfileVersion.V1_8, |
| _default_hf_sdp_features(), |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_ag_sdp_record(): |
| devices = TwoDevices() |
| await devices.setup_connection() |
| |
| devices[0].sdp_service_records[1] = hfp.make_ag_sdp_records( |
| 1, 2, _default_ag_configuration(), hfp.ProfileVersion.V1_8 |
| ) |
| |
| assert await hfp.find_ag_sdp_record(devices.connections[1]) == ( |
| 2, |
| hfp.ProfileVersion.V1_8, |
| _default_ag_sdp_features(), |
| ) |
| |
| |
| # ----------------------------------------------------------------------------- |
| @pytest.mark.asyncio |
| async def test_sco_setup(): |
| devices = TwoDevices() |
| |
| # Enable Classic connections |
| devices[0].classic_enabled = True |
| devices[1].classic_enabled = True |
| |
| # Start |
| await devices[0].power_on() |
| await devices[1].power_on() |
| |
| connections = await asyncio.gather( |
| devices[0].connect( |
| devices[1].public_address, transport=core.BT_BR_EDR_TRANSPORT |
| ), |
| devices[1].accept(devices[0].public_address), |
| ) |
| |
| def on_sco_request(_connection, _link_type: int): |
| connections[1].abort_on( |
| 'disconnection', |
| devices[1].send_command( |
| hci.HCI_Enhanced_Accept_Synchronous_Connection_Request_Command( |
| bd_addr=connections[1].peer_address, |
| **hfp.ESCO_PARAMETERS[ |
| hfp.DefaultCodecParameters.ESCO_CVSD_S1 |
| ].asdict(), |
| ) |
| ), |
| ) |
| |
| devices[1].on('sco_request', on_sco_request) |
| |
| sco_connection_futures = [ |
| asyncio.get_running_loop().create_future(), |
| asyncio.get_running_loop().create_future(), |
| ] |
| |
| for device, future in zip(devices, sco_connection_futures): |
| device.on('sco_connection', future.set_result) |
| |
| await devices[0].send_command( |
| hci.HCI_Enhanced_Setup_Synchronous_Connection_Command( |
| connection_handle=connections[0].handle, |
| **hfp.ESCO_PARAMETERS[hfp.DefaultCodecParameters.ESCO_CVSD_S1].asdict(), |
| ) |
| ) |
| sco_connections = await asyncio.gather(*sco_connection_futures) |
| |
| sco_disconnection_futures = [ |
| asyncio.get_running_loop().create_future(), |
| asyncio.get_running_loop().create_future(), |
| ] |
| for future, sco_connection in zip(sco_disconnection_futures, sco_connections): |
| sco_connection.on('disconnection', future.set_result) |
| |
| await sco_connections[0].disconnect() |
| await asyncio.gather(*sco_disconnection_futures) |
| |
| |
| # ----------------------------------------------------------------------------- |
| async def run(): |
| await test_slc() |
| |
| |
| # ----------------------------------------------------------------------------- |
| if __name__ == '__main__': |
| logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) |
| asyncio.run(run()) |