Merge "Merge remote-tracking branch 'aosp/upstream-main' into bumble" into main am: 4df3d0aeaf am: da98c33cb6
Original change: https://android-review.googlesource.com/c/platform/external/python/bumble/+/3249813
Change-Id: If75b00746aac1bc1e8077158158e726a784b55aa
Signed-off-by: Automerger Merge Worker <[email protected]>
diff --git a/bumble/decoder.py b/bumble/decoder.py
index 2eb70bc..83a23b1 100644
--- a/bumble/decoder.py
+++ b/bumble/decoder.py
@@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from typing import Union
+
# -----------------------------------------------------------------------------
# Constants
# -----------------------------------------------------------------------------
@@ -149,7 +151,7 @@
# -----------------------------------------------------------------------------
# Classes
# -----------------------------------------------------------------------------
-class G722Decoder(object):
+class G722Decoder:
"""G.722 decoder with bitrate 64kbit/s.
For the Blocks in the sub-band decoders, please refer to the G.722
@@ -157,7 +159,7 @@
https://www.itu.int/rec/T-REC-G.722-201209-I
"""
- def __init__(self):
+ def __init__(self) -> None:
self._x = [0] * 24
self._band = [Band(), Band()]
# The initial value in BLOCK 3L
@@ -165,12 +167,12 @@
# The initial value in BLOCK 3H
self._band[1].det = 8
- def decode_frame(self, encoded_data) -> bytearray:
+ def decode_frame(self, encoded_data: Union[bytes, bytearray]) -> bytearray:
result_array = bytearray(len(encoded_data) * 4)
self.g722_decode(result_array, encoded_data)
return result_array
- def g722_decode(self, result_array, encoded_data) -> int:
+ def g722_decode(self, result_array, encoded_data: Union[bytes, bytearray]) -> int:
"""Decode the data frame using g722 decoder."""
result_length = 0
@@ -198,14 +200,16 @@
return result_length
- def update_decoded_result(self, xout, byte_length, byte_array) -> int:
+ def update_decoded_result(
+ self, xout: int, byte_length: int, byte_array: bytearray
+ ) -> int:
result = (int)(xout >> 11)
bytes_result = result.to_bytes(2, 'little', signed=True)
byte_array[byte_length] = bytes_result[0]
byte_array[byte_length + 1] = bytes_result[1]
return byte_length + 2
- def lower_sub_band_decoder(self, lower_bits) -> int:
+ def lower_sub_band_decoder(self, lower_bits: int) -> int:
"""Lower sub-band decoder for last six bits."""
# Block 5L
@@ -258,7 +262,7 @@
return rlow
- def higher_sub_band_decoder(self, higher_bits) -> int:
+ def higher_sub_band_decoder(self, higher_bits: int) -> int:
"""Higher sub-band decoder for first two bits."""
# Block 2H
@@ -306,14 +310,14 @@
# -----------------------------------------------------------------------------
-class Band(object):
- """Structure for G722 decode proccessing."""
+class Band:
+ """Structure for G722 decode processing."""
s: int = 0
nb: int = 0
det: int = 0
- def __init__(self):
+ def __init__(self) -> None:
self._sp = 0
self._sz = 0
self._r = [0] * 3
diff --git a/bumble/pandora/__init__.py b/bumble/pandora/__init__.py
index e02f54a..8fb4b6e 100644
--- a/bumble/pandora/__init__.py
+++ b/bumble/pandora/__init__.py
@@ -25,8 +25,10 @@
from .config import Config
from .device import PandoraDevice
from .host import HostService
+from .l2cap import L2CAPService
from .security import SecurityService, SecurityStorageService
from pandora.host_grpc_aio import add_HostServicer_to_server
+from pandora.l2cap_grpc_aio import add_L2CAPServicer_to_server
from pandora.security_grpc_aio import (
add_SecurityServicer_to_server,
add_SecurityStorageServicer_to_server,
@@ -77,6 +79,7 @@
add_SecurityStorageServicer_to_server(
SecurityStorageService(bumble.device, config), server
)
+ add_L2CAPServicer_to_server(L2CAPService(bumble.device, config), server)
# call hooks if any.
for hook in _SERVICERS_HOOKS:
diff --git a/bumble/pandora/l2cap.py b/bumble/pandora/l2cap.py
new file mode 100644
index 0000000..f88c434
--- /dev/null
+++ b/bumble/pandora/l2cap.py
@@ -0,0 +1,322 @@
+# Copyright 2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from __future__ import annotations
+import asyncio
+import grpc
+import json
+import logging
+import threading
+
+from . import utils
+from .config import Config
+from bumble.device import Device
+from bumble.l2cap import (
+ ClassicChannel,
+ ClassicChannelServer,
+ ClassicChannelSpec,
+ LeCreditBasedChannel,
+ LeCreditBasedChannelServer,
+ LeCreditBasedChannelSpec,
+)
+from google.protobuf import any_pb2, empty_pb2 # pytype: disable=pyi-error
+from pandora.l2cap_grpc_aio import L2CAPServicer # pytype: disable=pyi-error
+from pandora.l2cap_pb2 import ( # pytype: disable=pyi-error
+ COMMAND_NOT_UNDERSTOOD,
+ INVALID_CID_IN_REQUEST,
+ Channel,
+ ConnectRequest,
+ ConnectResponse,
+ CreditBasedChannelRequest,
+ DisconnectRequest,
+ DisconnectResponse,
+ ReceiveRequest,
+ ReceiveResponse,
+ SendRequest,
+ SendResponse,
+ WaitConnectionRequest,
+ WaitConnectionResponse,
+ WaitDisconnectionRequest,
+ WaitDisconnectionResponse,
+)
+from typing import Any, AsyncGenerator, Dict, Optional, Union
+
+
+class L2CAPService(L2CAPServicer):
+ def __init__(self, device: Device, config: Config) -> None:
+ self.log = utils.BumbleServerLoggerAdapter(
+ logging.getLogger(), {'service_name': 'L2CAP', 'device': device}
+ )
+ self.device = device
+ self.config = config
+ self.sdu_queue: asyncio.Queue = asyncio.Queue()
+
+ @utils.rpc
+ async def WaitConnection(
+ self, request: WaitConnectionRequest, context: grpc.ServicerContext
+ ) -> WaitConnectionResponse:
+ self.log.debug('WaitConnection')
+ if not request.connection:
+ raise ValueError('A valid connection field must be set')
+
+ # find connection on device based on connection cookie value
+ connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
+ connection = self.device.lookup_connection(connection_handle)
+
+ if not connection:
+ raise ValueError('The connection specified is invalid.')
+
+ oneof = request.WhichOneof('type')
+ self.log.debug(f'WaitConnection channel request type: {oneof}.')
+ channel_type = getattr(request, oneof)
+ spec: Optional[Union[ClassicChannelSpec, LeCreditBasedChannelSpec]] = None
+ l2cap_server: Optional[
+ Union[ClassicChannelServer, LeCreditBasedChannelServer]
+ ] = None
+ if isinstance(channel_type, CreditBasedChannelRequest):
+ spec = LeCreditBasedChannelSpec(
+ psm=channel_type.spsm,
+ max_credits=channel_type.initial_credit,
+ mtu=channel_type.mtu,
+ mps=channel_type.mps,
+ )
+ if channel_type.spsm in self.device.l2cap_channel_manager.le_coc_servers:
+ l2cap_server = self.device.l2cap_channel_manager.le_coc_servers[
+ channel_type.spsm
+ ]
+ else:
+ spec = ClassicChannelSpec(
+ psm=channel_type.psm,
+ mtu=channel_type.mtu,
+ )
+ if channel_type.psm in self.device.l2cap_channel_manager.servers:
+ l2cap_server = self.device.l2cap_channel_manager.servers[
+ channel_type.psm
+ ]
+
+ self.log.info(f'Listening for L2CAP connection on PSM {spec.psm}')
+ channel_future: asyncio.Future[Union[ClassicChannel, LeCreditBasedChannel]] = (
+ asyncio.get_running_loop().create_future()
+ )
+
+ def on_l2cap_channel(
+ l2cap_channel: Union[ClassicChannel, LeCreditBasedChannel]
+ ):
+ try:
+ channel_future.set_result(l2cap_channel)
+ self.log.debug(
+ f'Channel future set successfully with channel= {l2cap_channel}'
+ )
+ except Exception as e:
+ self.log.error(f'Failed to set channel future: {e}')
+
+ if l2cap_server is None:
+ l2cap_server = self.device.create_l2cap_server(
+ spec=spec, handler=on_l2cap_channel
+ )
+ else:
+ l2cap_server.on('connection', on_l2cap_channel)
+
+ try:
+ self.log.debug('Waiting for a channel connection.')
+ l2cap_channel = await channel_future
+ channel = self.channel_to_proto(l2cap_channel)
+ return WaitConnectionResponse(channel=channel)
+ except Exception as e:
+ self.log.warning(f'Exception: {e}')
+ return WaitConnectionResponse(error=COMMAND_NOT_UNDERSTOOD)
+
+ @utils.rpc
+ async def WaitDisconnection(
+ self, request: WaitDisconnectionRequest, context: grpc.ServicerContext
+ ) -> WaitDisconnectionResponse:
+ try:
+ self.log.debug('WaitDisconnection')
+ l2cap_channel = self.get_l2cap_channel(request.channel)
+ if l2cap_channel is None:
+ self.log.warn('WaitDisconnection: Unable to find the channel')
+ return WaitDisconnectionResponse(error=INVALID_CID_IN_REQUEST)
+
+ self.log.debug('WaitDisconnection: Sending a disconnection request')
+ closed_event: asyncio.Event = asyncio.Event()
+
+ def on_close():
+ self.log.info('Received a close event')
+ closed_event.set()
+
+ l2cap_channel.on('close', on_close)
+ await closed_event.wait()
+ return WaitDisconnectionResponse(success=empty_pb2.Empty())
+ except Exception as e:
+ self.log.exception(f'WaitDisonnection failed: {e}')
+ return WaitDisconnectionResponse(error=COMMAND_NOT_UNDERSTOOD)
+
+ @utils.rpc
+ async def Receive(
+ self, request: ReceiveRequest, context: grpc.ServicerContext
+ ) -> AsyncGenerator[ReceiveResponse, None]:
+ self.log.debug('Receive')
+ oneof = request.WhichOneof('source')
+ self.log.debug(f'Source: {oneof}.')
+ channel = getattr(request, oneof)
+
+ if not isinstance(channel, Channel):
+ raise NotImplementedError(f'TODO: {type(channel)} not currently supported.')
+
+ def on_channel_sdu(sdu):
+ async def handle_sdu():
+ await self.sdu_queue.put(sdu)
+
+ asyncio.create_task(handle_sdu())
+
+ l2cap_channel = self.get_l2cap_channel(channel)
+ if l2cap_channel is None:
+ raise ValueError('The channel in the request is not valid.')
+
+ l2cap_channel.sink = on_channel_sdu
+ while sdu := await self.sdu_queue.get():
+ # Retrieve the next SDU from the queue
+ self.log.debug(f'Receive: Received {len(sdu)} bytes -> {sdu.decode()}')
+ response = ReceiveResponse(data=sdu)
+ yield response
+
+ @utils.rpc
+ async def Connect(
+ self, request: ConnectRequest, context: grpc.ServicerContext
+ ) -> ConnectResponse:
+ self.log.debug('Connect')
+
+ if not request.connection:
+ raise ValueError('A valid connection field must be set')
+
+ # find connection on device based on connection cookie value
+ connection_handle = int.from_bytes(request.connection.cookie.value, 'big')
+ connection = self.device.lookup_connection(connection_handle)
+
+ if not connection:
+ raise ValueError('The connection specified is invalid.')
+
+ oneof = request.WhichOneof('type')
+ self.log.debug(f'Channel request type: {oneof}.')
+ channel_type = getattr(request, oneof)
+ spec: Optional[Union[ClassicChannelSpec, LeCreditBasedChannelSpec]] = None
+ if isinstance(channel_type, CreditBasedChannelRequest):
+ spec = LeCreditBasedChannelSpec(
+ psm=channel_type.spsm,
+ max_credits=channel_type.initial_credit,
+ mtu=channel_type.mtu,
+ mps=channel_type.mps,
+ )
+ else:
+ spec = ClassicChannelSpec(
+ psm=channel_type.psm,
+ mtu=channel_type.mtu,
+ )
+
+ try:
+ self.log.info(f'Opening L2CAP channel on PSM = {spec.psm}')
+ l2cap_channel = await connection.create_l2cap_channel(spec=spec)
+ self.log.info(f'L2CAP channel: {l2cap_channel}')
+ except Exception as e:
+ l2cap_channel = None
+ self.log.exception(f'Connection failed: {e}')
+
+ if not l2cap_channel:
+ return ConnectResponse(error=COMMAND_NOT_UNDERSTOOD)
+
+ channel = self.channel_to_proto(l2cap_channel)
+ return ConnectResponse(channel=channel)
+
+ @utils.rpc
+ async def Disconnect(
+ self, request: DisconnectRequest, context: grpc.ServicerContext
+ ) -> DisconnectResponse:
+ try:
+ self.log.debug('Disconnect')
+ l2cap_channel = self.get_l2cap_channel(request.channel)
+ if not l2cap_channel:
+ self.log.warn('Disconnect: Unable to find the channel')
+ return DisconnectResponse(error=INVALID_CID_IN_REQUEST)
+
+ await l2cap_channel.disconnect()
+ return DisconnectResponse(success=empty_pb2.Empty())
+ except Exception as e:
+ self.log.exception(f'Disonnect failed: {e}')
+ return DisconnectResponse(error=COMMAND_NOT_UNDERSTOOD)
+
+ @utils.rpc
+ async def Send(
+ self, request: SendRequest, context: grpc.ServicerContext
+ ) -> SendResponse:
+ self.log.debug('Send')
+ try:
+ oneof = request.WhichOneof('sink')
+ self.log.debug(f'Sink: {oneof}.')
+ channel = getattr(request, oneof)
+
+ if not isinstance(channel, Channel):
+ raise NotImplementedError(
+ f'TODO: {type(channel)} not currently supported.'
+ )
+ l2cap_channel = self.get_l2cap_channel(channel)
+ if not l2cap_channel:
+ return SendResponse(error=COMMAND_NOT_UNDERSTOOD)
+ if isinstance(l2cap_channel, ClassicChannel):
+ l2cap_channel.send_pdu(request.data)
+ else:
+ l2cap_channel.write(request.data)
+ return SendResponse(success=empty_pb2.Empty())
+ except Exception as e:
+ self.log.exception(f'Disonnect failed: {e}')
+ return SendResponse(error=COMMAND_NOT_UNDERSTOOD)
+
+ def get_l2cap_channel(
+ self, channel: Channel
+ ) -> Optional[Union[ClassicChannel, LeCreditBasedChannel]]:
+ parameters = self.get_channel_parameters(channel)
+ connection_handle = parameters.get('connection_handle', 0)
+ destination_cid = parameters.get('destination_cid', 0)
+ is_classic = parameters.get('is_classic', False)
+ self.log.debug(
+ f'get_l2cap_channel: Connection handle:{connection_handle}, cid:{destination_cid}'
+ )
+ l2cap_channel: Optional[Union[ClassicChannel, LeCreditBasedChannel]] = None
+ if is_classic:
+ l2cap_channel = self.device.l2cap_channel_manager.find_channel(
+ connection_handle, destination_cid
+ )
+ else:
+ l2cap_channel = self.device.l2cap_channel_manager.find_le_coc_channel(
+ connection_handle, destination_cid
+ )
+ return l2cap_channel
+
+ def channel_to_proto(
+ self, l2cap_channel: Union[ClassicChannel, LeCreditBasedChannel]
+ ) -> Channel:
+ parameters = {
+ "source_cid": l2cap_channel.source_cid,
+ "destination_cid": l2cap_channel.destination_cid,
+ "connection_handle": l2cap_channel.connection.handle,
+ "is_classic": True if isinstance(l2cap_channel, ClassicChannel) else False,
+ }
+ self.log.info(f'Channel parameters: {parameters}')
+ cookie = any_pb2.Any()
+ cookie.value = json.dumps(parameters).encode()
+ return Channel(cookie=cookie)
+
+ def get_channel_parameters(self, channel: Channel) -> Dict['str', Any]:
+ cookie_value = channel.cookie.value.decode()
+ parameters = json.loads(cookie_value)
+ self.log.info(f'Channel parameters: {parameters}')
+ return parameters
diff --git a/bumble/profiles/asha.py b/bumble/profiles/asha.py
new file mode 100644
index 0000000..b2aa441
--- /dev/null
+++ b/bumble/profiles/asha.py
@@ -0,0 +1,295 @@
+# Copyright 2021-2022 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# -----------------------------------------------------------------------------
+# Imports
+# -----------------------------------------------------------------------------
+import enum
+import struct
+import logging
+from typing import List, Optional, Callable, Union, Any
+
+from bumble import l2cap
+from bumble import utils
+from bumble import gatt
+from bumble import gatt_client
+from bumble.core import AdvertisingData
+from bumble.device import Device, Connection
+
+# -----------------------------------------------------------------------------
+# Logging
+# -----------------------------------------------------------------------------
+_logger = logging.getLogger(__name__)
+
+
+# -----------------------------------------------------------------------------
+# Constants
+# -----------------------------------------------------------------------------
+class DeviceCapabilities(enum.IntFlag):
+ IS_RIGHT = 0x01
+ IS_DUAL = 0x02
+ CSIS_SUPPORTED = 0x04
+
+
+class FeatureMap(enum.IntFlag):
+ LE_COC_AUDIO_OUTPUT_STREAMING_SUPPORTED = 0x01
+
+
+class AudioType(utils.OpenIntEnum):
+ UNKNOWN = 0x00
+ RINGTONE = 0x01
+ PHONE_CALL = 0x02
+ MEDIA = 0x03
+
+
+class OpCode(utils.OpenIntEnum):
+ START = 1
+ STOP = 2
+ STATUS = 3
+
+
+class Codec(utils.OpenIntEnum):
+ G_722_16KHZ = 1
+
+
+class SupportedCodecs(enum.IntFlag):
+ G_722_16KHZ = 1 << Codec.G_722_16KHZ
+
+
+class PeripheralStatus(utils.OpenIntEnum):
+ """Status update on the other peripheral."""
+
+ OTHER_PERIPHERAL_DISCONNECTED = 1
+ OTHER_PERIPHERAL_CONNECTED = 2
+ CONNECTION_PARAMETER_UPDATED = 3
+
+
+class AudioStatus(utils.OpenIntEnum):
+ """Status report field for the audio control point."""
+
+ OK = 0
+ UNKNOWN_COMMAND = -1
+ ILLEGAL_PARAMETERS = -2
+
+
+# -----------------------------------------------------------------------------
+class AshaService(gatt.TemplateService):
+ UUID = gatt.GATT_ASHA_SERVICE
+
+ audio_sink: Optional[Callable[[bytes], Any]]
+ active_codec: Optional[Codec] = None
+ audio_type: Optional[AudioType] = None
+ volume: Optional[int] = None
+ other_state: Optional[int] = None
+ connection: Optional[Connection] = None
+
+ def __init__(
+ self,
+ capability: int,
+ hisyncid: Union[List[int], bytes],
+ device: Device,
+ psm: int = 0,
+ audio_sink: Optional[Callable[[bytes], Any]] = None,
+ feature_map: int = FeatureMap.LE_COC_AUDIO_OUTPUT_STREAMING_SUPPORTED,
+ protocol_version: int = 0x01,
+ render_delay_milliseconds: int = 0,
+ supported_codecs: int = SupportedCodecs.G_722_16KHZ,
+ ) -> None:
+ if len(hisyncid) != 8:
+ _logger.warning('HiSyncId should have a length of 8, got %d', len(hisyncid))
+
+ self.hisyncid = bytes(hisyncid)
+ self.capability = capability
+ self.device = device
+ self.audio_out_data = b''
+ self.psm = psm # a non-zero psm is mainly for testing purpose
+ self.audio_sink = audio_sink
+ self.protocol_version = protocol_version
+
+ self.read_only_properties_characteristic = gatt.Characteristic(
+ gatt.GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
+ gatt.Characteristic.Properties.READ,
+ gatt.Characteristic.READABLE,
+ struct.pack(
+ "<BB8sBH2sH",
+ protocol_version,
+ capability,
+ self.hisyncid,
+ feature_map,
+ render_delay_milliseconds,
+ b'\x00\x00',
+ supported_codecs,
+ ),
+ )
+
+ self.audio_control_point_characteristic = gatt.Characteristic(
+ gatt.GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
+ gatt.Characteristic.Properties.WRITE
+ | gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
+ gatt.Characteristic.WRITEABLE,
+ gatt.CharacteristicValue(write=self._on_audio_control_point_write),
+ )
+ self.audio_status_characteristic = gatt.Characteristic(
+ gatt.GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
+ gatt.Characteristic.Properties.READ | gatt.Characteristic.Properties.NOTIFY,
+ gatt.Characteristic.READABLE,
+ bytes([AudioStatus.OK]),
+ )
+ self.volume_characteristic = gatt.Characteristic(
+ gatt.GATT_ASHA_VOLUME_CHARACTERISTIC,
+ gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
+ gatt.Characteristic.WRITEABLE,
+ gatt.CharacteristicValue(write=self._on_volume_write),
+ )
+
+ # let the server find a free PSM
+ self.psm = device.create_l2cap_server(
+ spec=l2cap.LeCreditBasedChannelSpec(psm=self.psm, max_credits=8),
+ handler=self._on_connection,
+ ).psm
+ self.le_psm_out_characteristic = gatt.Characteristic(
+ gatt.GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
+ gatt.Characteristic.Properties.READ,
+ gatt.Characteristic.READABLE,
+ struct.pack('<H', self.psm),
+ )
+
+ characteristics = [
+ self.read_only_properties_characteristic,
+ self.audio_control_point_characteristic,
+ self.audio_status_characteristic,
+ self.volume_characteristic,
+ self.le_psm_out_characteristic,
+ ]
+
+ super().__init__(characteristics)
+
+ def get_advertising_data(self) -> bytes:
+ # Advertisement only uses 4 least significant bytes of the HiSyncId.
+ return bytes(
+ AdvertisingData(
+ [
+ (
+ AdvertisingData.SERVICE_DATA_16_BIT_UUID,
+ bytes(gatt.GATT_ASHA_SERVICE)
+ + bytes([self.protocol_version, self.capability])
+ + self.hisyncid[:4],
+ ),
+ ]
+ )
+ )
+
+ # Handler for audio control commands
+ async def _on_audio_control_point_write(
+ self, connection: Optional[Connection], value: bytes
+ ) -> None:
+ _logger.debug(f'--- AUDIO CONTROL POINT Write:{value.hex()}')
+ opcode = value[0]
+ if opcode == OpCode.START:
+ # Start
+ self.active_codec = Codec(value[1])
+ self.audio_type = AudioType(value[2])
+ self.volume = value[3]
+ self.other_state = value[4]
+ _logger.debug(
+ f'### START: codec={self.active_codec.name}, '
+ f'audio_type={self.audio_type.name}, '
+ f'volume={self.volume}, '
+ f'other_state={self.other_state}'
+ )
+ self.emit('started')
+ elif opcode == OpCode.STOP:
+ _logger.debug('### STOP')
+ self.active_codec = None
+ self.audio_type = None
+ self.volume = None
+ self.other_state = None
+ self.emit('stopped')
+ elif opcode == OpCode.STATUS:
+ _logger.debug('### STATUS: %s', PeripheralStatus(value[1]).name)
+
+ if self.connection is None and connection:
+ self.connection = connection
+
+ def on_disconnection(_reason) -> None:
+ self.connection = None
+ self.active_codec = None
+ self.audio_type = None
+ self.volume = None
+ self.other_state = None
+ self.emit('disconnected')
+
+ connection.once('disconnection', on_disconnection)
+
+ # OPCODE_STATUS does not need audio status point update
+ if opcode != OpCode.STATUS:
+ await self.device.notify_subscribers(
+ self.audio_status_characteristic, force=True
+ )
+
+ # Handler for volume control
+ def _on_volume_write(self, connection: Optional[Connection], value: bytes) -> None:
+ _logger.debug(f'--- VOLUME Write:{value[0]}')
+ self.volume = value[0]
+ self.emit('volume_changed')
+
+ # Register an L2CAP CoC server
+ def _on_connection(self, channel: l2cap.LeCreditBasedChannel) -> None:
+ def on_data(data: bytes) -> None:
+ if self.audio_sink: # pylint: disable=not-callable
+ self.audio_sink(data)
+
+ channel.sink = on_data
+
+
+# -----------------------------------------------------------------------------
+class AshaServiceProxy(gatt_client.ProfileServiceProxy):
+ SERVICE_CLASS = AshaService
+ read_only_properties_characteristic: gatt_client.CharacteristicProxy
+ audio_control_point_characteristic: gatt_client.CharacteristicProxy
+ audio_status_point_characteristic: gatt_client.CharacteristicProxy
+ volume_characteristic: gatt_client.CharacteristicProxy
+ psm_characteristic: gatt_client.CharacteristicProxy
+
+ def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None:
+ self.service_proxy = service_proxy
+
+ for uuid, attribute_name in (
+ (
+ gatt.GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
+ 'read_only_properties_characteristic',
+ ),
+ (
+ gatt.GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
+ 'audio_control_point_characteristic',
+ ),
+ (
+ gatt.GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
+ 'audio_status_point_characteristic',
+ ),
+ (
+ gatt.GATT_ASHA_VOLUME_CHARACTERISTIC,
+ 'volume_characteristic',
+ ),
+ (
+ gatt.GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
+ 'psm_characteristic',
+ ),
+ ):
+ if not (
+ characteristics := self.service_proxy.get_characteristics_by_uuid(uuid)
+ ):
+ raise gatt.InvalidServiceError(f"Missing {uuid} Characteristic")
+ setattr(self, attribute_name, characteristics[0])
diff --git a/bumble/profiles/asha_service.py b/bumble/profiles/asha_service.py
deleted file mode 100644
index acbc47e..0000000
--- a/bumble/profiles/asha_service.py
+++ /dev/null
@@ -1,193 +0,0 @@
-# Copyright 2021-2022 Google LLC
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-# -----------------------------------------------------------------------------
-# Imports
-# -----------------------------------------------------------------------------
-import struct
-import logging
-from typing import List, Optional
-
-from bumble import l2cap
-from ..core import AdvertisingData
-from ..device import Device, Connection
-from ..gatt import (
- GATT_ASHA_SERVICE,
- GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
- GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
- GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
- GATT_ASHA_VOLUME_CHARACTERISTIC,
- GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
- TemplateService,
- Characteristic,
- CharacteristicValue,
-)
-from ..utils import AsyncRunner
-
-# -----------------------------------------------------------------------------
-# Logging
-# -----------------------------------------------------------------------------
-logger = logging.getLogger(__name__)
-
-
-# -----------------------------------------------------------------------------
-class AshaService(TemplateService):
- UUID = GATT_ASHA_SERVICE
- OPCODE_START = 1
- OPCODE_STOP = 2
- OPCODE_STATUS = 3
- PROTOCOL_VERSION = 0x01
- RESERVED_FOR_FUTURE_USE = [00, 00]
- FEATURE_MAP = [0x01] # [LE CoC audio output streaming supported]
- SUPPORTED_CODEC_ID = [0x02, 0x01] # Codec IDs [G.722 at 16 kHz]
- RENDER_DELAY = [00, 00]
-
- def __init__(self, capability: int, hisyncid: List[int], device: Device, psm=0):
- self.hisyncid = hisyncid
- self.capability = capability # Device Capabilities [Left, Monaural]
- self.device = device
- self.audio_out_data = b''
- self.psm = psm # a non-zero psm is mainly for testing purpose
-
- # Handler for volume control
- def on_volume_write(connection, value):
- logger.info(f'--- VOLUME Write:{value[0]}')
- self.emit('volume', connection, value[0])
-
- # Handler for audio control commands
- def on_audio_control_point_write(connection: Optional[Connection], value):
- logger.info(f'--- AUDIO CONTROL POINT Write:{value.hex()}')
- opcode = value[0]
- if opcode == AshaService.OPCODE_START:
- # Start
- audio_type = ('Unknown', 'Ringtone', 'Phone Call', 'Media')[value[2]]
- logger.info(
- f'### START: codec={value[1]}, '
- f'audio_type={audio_type}, '
- f'volume={value[3]}, '
- f'otherstate={value[4]}'
- )
- self.emit(
- 'start',
- connection,
- {
- 'codec': value[1],
- 'audiotype': value[2],
- 'volume': value[3],
- 'otherstate': value[4],
- },
- )
- elif opcode == AshaService.OPCODE_STOP:
- logger.info('### STOP')
- self.emit('stop', connection)
- elif opcode == AshaService.OPCODE_STATUS:
- logger.info(f'### STATUS: connected={value[1]}')
-
- # OPCODE_STATUS does not need audio status point update
- if opcode != AshaService.OPCODE_STATUS:
- AsyncRunner.spawn(
- device.notify_subscribers(
- self.audio_status_characteristic, force=True
- )
- )
-
- self.read_only_properties_characteristic = Characteristic(
- GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
- Characteristic.Properties.READ,
- Characteristic.READABLE,
- bytes(
- [
- AshaService.PROTOCOL_VERSION, # Version
- self.capability,
- ]
- )
- + bytes(self.hisyncid)
- + bytes(AshaService.FEATURE_MAP)
- + bytes(AshaService.RENDER_DELAY)
- + bytes(AshaService.RESERVED_FOR_FUTURE_USE)
- + bytes(AshaService.SUPPORTED_CODEC_ID),
- )
-
- self.audio_control_point_characteristic = Characteristic(
- GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
- Characteristic.Properties.WRITE
- | Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
- Characteristic.WRITEABLE,
- CharacteristicValue(write=on_audio_control_point_write),
- )
- self.audio_status_characteristic = Characteristic(
- GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC,
- Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
- Characteristic.READABLE,
- bytes([0]),
- )
- self.volume_characteristic = Characteristic(
- GATT_ASHA_VOLUME_CHARACTERISTIC,
- Characteristic.Properties.WRITE_WITHOUT_RESPONSE,
- Characteristic.WRITEABLE,
- CharacteristicValue(write=on_volume_write),
- )
-
- # Register an L2CAP CoC server
- def on_coc(channel):
- def on_data(data):
- logging.debug(f'<<< data received:{data}')
-
- self.emit('data', channel.connection, data)
- self.audio_out_data += data
-
- channel.sink = on_data
-
- # let the server find a free PSM
- self.psm = device.create_l2cap_server(
- spec=l2cap.LeCreditBasedChannelSpec(psm=self.psm, max_credits=8),
- handler=on_coc,
- ).psm
- self.le_psm_out_characteristic = Characteristic(
- GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC,
- Characteristic.Properties.READ,
- Characteristic.READABLE,
- struct.pack('<H', self.psm),
- )
-
- characteristics = [
- self.read_only_properties_characteristic,
- self.audio_control_point_characteristic,
- self.audio_status_characteristic,
- self.volume_characteristic,
- self.le_psm_out_characteristic,
- ]
-
- super().__init__(characteristics)
-
- def get_advertising_data(self):
- # Advertisement only uses 4 least significant bytes of the HiSyncId.
- return bytes(
- AdvertisingData(
- [
- (
- AdvertisingData.SERVICE_DATA_16_BIT_UUID,
- bytes(GATT_ASHA_SERVICE)
- + bytes(
- [
- AshaService.PROTOCOL_VERSION,
- self.capability,
- ]
- )
- + bytes(self.hisyncid[:4]),
- ),
- ]
- )
- )
diff --git a/bumble/smp.py b/bumble/smp.py
index 5d6bcc5..c055e71 100644
--- a/bumble/smp.py
+++ b/bumble/smp.py
@@ -764,7 +764,9 @@
self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
# OOB
- self.oob_data_flag = 0 if pairing_config.oob is None else 1
+ self.oob_data_flag = (
+ 1 if pairing_config.oob and pairing_config.oob.peer_data else 0
+ )
# Set up addresses
self_address = connection.self_resolvable_address or connection.self_address
@@ -1014,8 +1016,10 @@
self.send_command(response)
def send_pairing_confirm_command(self) -> None:
- self.r = crypto.r()
- logger.debug(f'generated random: {self.r.hex()}')
+
+ if self.pairing_method != PairingMethod.OOB:
+ self.r = crypto.r()
+ logger.debug(f'generated random: {self.r.hex()}')
if self.sc:
@@ -1735,7 +1739,6 @@
if self.pairing_method in (
PairingMethod.JUST_WORKS,
PairingMethod.NUMERIC_COMPARISON,
- PairingMethod.OOB,
):
ra = bytes(16)
rb = ra
@@ -1743,6 +1746,22 @@
assert self.passkey
ra = self.passkey.to_bytes(16, byteorder='little')
rb = ra
+ elif self.pairing_method == PairingMethod.OOB:
+ if self.is_initiator:
+ if self.peer_oob_data:
+ rb = self.peer_oob_data.r
+ ra = self.r
+ else:
+ rb = bytes(16)
+ ra = self.r
+ else:
+ if self.peer_oob_data:
+ ra = self.peer_oob_data.r
+ rb = self.r
+ else:
+ ra = bytes(16)
+ rb = self.r
+
else:
return
diff --git a/examples/asha_sink.html b/examples/asha_sink.html
new file mode 100644
index 0000000..410fd1e
--- /dev/null
+++ b/examples/asha_sink.html
@@ -0,0 +1,95 @@
+<html data-bs-theme="dark">
+
+<head>
+ <link href="https://cdn.jsdelivr.net/npm/[email protected]/dist/css/bootstrap.min.css" rel="stylesheet"
+ integrity="sha384-T3c6CoIi6uLrA9TneNEoa7RxnatzjcDSCmG1MXxSR1GAsXEV/Dwwykc2MPK8M2HN" crossorigin="anonymous">
+ <script src="https://unpkg.com/pcm-player"></script>
+</head>
+
+<body>
+ <nav class="navbar navbar-dark bg-primary">
+ <div class="container">
+ <span class="navbar-brand mb-0 h1">Bumble ASHA Sink</span>
+ </div>
+ </nav>
+ <br>
+
+ <div class="container">
+
+ <div class="row">
+ <div class="col-auto">
+ <button id="connect-audio" class="btn btn-danger" onclick="connectAudio()">Connect Audio</button>
+ </div>
+ </div>
+
+ <hr>
+
+ <div class="row">
+ <div class="col-4">
+ <label class="form-label">Browser Gain</label>
+ <input type="range" class="form-range" id="browser-gain" min="0" max="2" value="1" step="0.1"
+ onchange="setGain()">
+ </div>
+ </div>
+
+ <hr>
+
+ <div id="socketStateContainer" class="bg-body-tertiary p-3 rounded-2">
+ <h3>Log</h3>
+ <code id="log" style="white-space: pre-line;"></code>
+ </div>
+ </div>
+
+
+ <script>
+ let atResponseInput = document.getElementById("at_response")
+ let gainInput = document.getElementById('browser-gain')
+ let log = document.getElementById("log")
+ let socket = new WebSocket('ws://localhost:8888');
+ let sampleRate = 0;
+ let player;
+
+ socket.binaryType = "arraybuffer";
+ socket.onopen = _ => {
+ log.textContent += 'SOCKET OPEN\n'
+ }
+ socket.onclose = _ => {
+ log.textContent += 'SOCKET CLOSED\n'
+ }
+ socket.onerror = (error) => {
+ log.textContent += 'SOCKET ERROR\n'
+ console.log(`ERROR: ${error}`)
+ }
+ socket.onmessage = function (message) {
+ if (typeof message.data === 'string' || message.data instanceof String) {
+ log.textContent += `<-- ${event.data}\n`
+ } else {
+ // BINARY audio data.
+ if (player == null) return;
+ player.feed(message.data);
+ }
+ };
+
+ function connectAudio() {
+ player = new PCMPlayer({
+ inputCodec: 'Int16',
+ channels: 1,
+ sampleRate: 16000,
+ flushTime: 20,
+ });
+ player.volume(gainInput.value);
+ const button = document.getElementById("connect-audio")
+ button.disabled = true;
+ button.textContent = "Audio Connected";
+ }
+
+ function setGain() {
+ if (player != null) {
+ player.volume(gainInput.value);
+ }
+ }
+ </script>
+ </div>
+</body>
+
+</html>
\ No newline at end of file
diff --git a/examples/asha_sink1.json b/examples/asha_sink1.json
index badef8b..dc383e8 100644
--- a/examples/asha_sink1.json
+++ b/examples/asha_sink1.json
@@ -1,5 +1,6 @@
{
"name": "Bumble Aid Left",
"address": "F1:F2:F3:F4:F5:F6",
+ "identity_address_type": 1,
"keystore": "JsonKeyStore"
-}
+}
\ No newline at end of file
diff --git a/examples/asha_sink2.json b/examples/asha_sink2.json
index 785d406..b8dc6b8 100644
--- a/examples/asha_sink2.json
+++ b/examples/asha_sink2.json
@@ -1,5 +1,6 @@
{
"name": "Bumble Aid Right",
"address": "F7:F8:F9:FA:FB:FC",
+ "identity_address_type": 1,
"keystore": "JsonKeyStore"
-}
+}
\ No newline at end of file
diff --git a/examples/run_asha_sink.py b/examples/run_asha_sink.py
index 105eb75..485e17e 100644
--- a/examples/run_asha_sink.py
+++ b/examples/run_asha_sink.py
@@ -16,192 +16,104 @@
# Imports
# -----------------------------------------------------------------------------
import asyncio
-import struct
import sys
import os
import logging
+import websockets
-from bumble import l2cap
+from typing import Optional
+
+from bumble import decoder
+from bumble import gatt
from bumble.core import AdvertisingData
-from bumble.device import Device
+from bumble.device import Device, AdvertisingParameters
from bumble.transport import open_transport_or_link
-from bumble.core import UUID
-from bumble.gatt import Service, Characteristic, CharacteristicValue
+from bumble.profiles import asha
+
+ws_connection: Optional[websockets.WebSocketServerProtocol] = None
+g722_decoder = decoder.G722Decoder()
-# -----------------------------------------------------------------------------
-# Constants
-# -----------------------------------------------------------------------------
-ASHA_SERVICE = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid')
-ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID(
- '6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties'
-)
-ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC = UUID(
- 'f0d4de7e-4a88-476c-9d9f-1937b0996cc0', 'AudioControlPoint'
-)
-ASHA_AUDIO_STATUS_CHARACTERISTIC = UUID(
- '38663f1a-e711-4cac-b641-326b56404837', 'AudioStatus'
-)
-ASHA_VOLUME_CHARACTERISTIC = UUID('00e4ca9e-ab14-41e4-8823-f9e70c7e91df', 'Volume')
-ASHA_LE_PSM_OUT_CHARACTERISTIC = UUID(
- '2d410339-82b6-42aa-b34e-e2e01df8cc1a', 'LE_PSM_OUT'
-)
+async def ws_server(ws_client: websockets.WebSocketServerProtocol, path: str):
+ del path
+ global ws_connection
+ ws_connection = ws_client
+
+ async for message in ws_client:
+ print(message)
# -----------------------------------------------------------------------------
async def main() -> None:
- if len(sys.argv) != 4:
- print(
- 'Usage: python run_asha_sink.py <device-config> <transport-spec> '
- '<audio-file>'
- )
- print('example: python run_asha_sink.py device1.json usb:0 audio_out.g722')
+ if len(sys.argv) != 3:
+ print('Usage: python run_asha_sink.py <device-config> <transport-spec>')
+ print('example: python run_asha_sink.py device1.json usb:0')
return
- audio_out = open(sys.argv[3], 'wb')
-
async with await open_transport_or_link(sys.argv[2]) as hci_transport:
device = Device.from_config_file_with_hci(
sys.argv[1], hci_transport.source, hci_transport.sink
)
- # Handler for audio control commands
- def on_audio_control_point_write(_connection, value):
- print('--- AUDIO CONTROL POINT Write:', value.hex())
- opcode = value[0]
- if opcode == 1:
- # Start
- audio_type = ('Unknown', 'Ringtone', 'Phone Call', 'Media')[value[2]]
- print(
- f'### START: codec={value[1]}, audio_type={audio_type}, '
- f'volume={value[3]}, otherstate={value[4]}'
- )
- elif opcode == 2:
- print('### STOP')
- elif opcode == 3:
- print(f'### STATUS: connected={value[1]}')
+ def on_audio_packet(packet: bytes) -> None:
+ global ws_connection
+ if ws_connection:
+ offset = 1
+ while offset < len(packet):
+ pcm_data = g722_decoder.decode_frame(packet[offset : offset + 80])
+ offset += 80
+ asyncio.get_running_loop().create_task(ws_connection.send(pcm_data))
+ else:
+ logging.info("No active client")
- # Respond with a status
- asyncio.create_task(
- device.notify_subscribers(audio_status_characteristic, force=True)
- )
-
- # Handler for volume control
- def on_volume_write(_connection, value):
- print('--- VOLUME Write:', value[0])
-
- # Register an L2CAP CoC server
- def on_coc(channel):
- def on_data(data):
- print('<<< Voice data received:', data.hex())
- audio_out.write(data)
-
- channel.sink = on_data
-
- server = device.create_l2cap_server(
- spec=l2cap.LeCreditBasedChannelSpec(max_credits=8), handler=on_coc
+ asha_service = asha.AshaService(
+ capability=0,
+ hisyncid=b'\x01\x02\x03\x04\x05\x06\x07\x08',
+ device=device,
+ audio_sink=on_audio_packet,
)
- print(f'### LE_PSM_OUT = {server.psm}')
-
- # Add the ASHA service to the GATT server
- read_only_properties_characteristic = Characteristic(
- ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC,
- Characteristic.Properties.READ,
- Characteristic.READABLE,
- bytes(
- [
- 0x01, # Version
- 0x00, # Device Capabilities [Left, Monaural]
- 0x01,
- 0x02,
- 0x03,
- 0x04,
- 0x05,
- 0x06,
- 0x07,
- 0x08, # HiSyncId
- 0x01, # Feature Map [LE CoC audio output streaming supported]
- 0x00,
- 0x00, # Render Delay
- 0x00,
- 0x00, # RFU
- 0x02,
- 0x00, # Codec IDs [G.722 at 16 kHz]
- ]
- ),
- )
- audio_control_point_characteristic = Characteristic(
- ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC,
- Characteristic.Properties.WRITE | Characteristic.WRITE_WITHOUT_RESPONSE,
- Characteristic.WRITEABLE,
- CharacteristicValue(write=on_audio_control_point_write),
- )
- audio_status_characteristic = Characteristic(
- ASHA_AUDIO_STATUS_CHARACTERISTIC,
- Characteristic.Properties.READ | Characteristic.Properties.NOTIFY,
- Characteristic.READABLE,
- bytes([0]),
- )
- volume_characteristic = Characteristic(
- ASHA_VOLUME_CHARACTERISTIC,
- Characteristic.WRITE_WITHOUT_RESPONSE,
- Characteristic.WRITEABLE,
- CharacteristicValue(write=on_volume_write),
- )
- le_psm_out_characteristic = Characteristic(
- ASHA_LE_PSM_OUT_CHARACTERISTIC,
- Characteristic.Properties.READ,
- Characteristic.READABLE,
- struct.pack('<H', server.psm),
- )
- device.add_service(
- Service(
- ASHA_SERVICE,
- [
- read_only_properties_characteristic,
- audio_control_point_characteristic,
- audio_status_characteristic,
- volume_characteristic,
- le_psm_out_characteristic,
- ],
- )
- )
+ device.add_service(asha_service)
# Set the advertising data
- device.advertising_data = bytes(
- AdvertisingData(
- [
- (AdvertisingData.COMPLETE_LOCAL_NAME, bytes(device.name, 'utf-8')),
- (AdvertisingData.FLAGS, bytes([0x06])),
- (
- AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
- bytes(ASHA_SERVICE),
- ),
- (
- AdvertisingData.SERVICE_DATA_16_BIT_UUID,
- bytes(ASHA_SERVICE)
- + bytes(
- [
- 0x01, # Protocol Version
- 0x00, # Capability
- 0x01,
- 0x02,
- 0x03,
- 0x04, # Truncated HiSyncID
- ]
+ advertising_data = (
+ bytes(
+ AdvertisingData(
+ [
+ (
+ AdvertisingData.COMPLETE_LOCAL_NAME,
+ bytes(device.name, 'utf-8'),
),
- ),
- ]
+ (AdvertisingData.FLAGS, bytes([0x06])),
+ (
+ AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
+ bytes(gatt.GATT_ASHA_SERVICE),
+ ),
+ ]
+ )
)
+ + asha_service.get_advertising_data()
)
# Go!
await device.power_on()
- await device.start_advertising(auto_restart=True)
+ await device.create_advertising_set(
+ auto_restart=True,
+ advertising_data=advertising_data,
+ advertising_parameters=AdvertisingParameters(
+ primary_advertising_interval_min=100,
+ primary_advertising_interval_max=100,
+ ),
+ )
- await hci_transport.source.wait_for_termination()
+ await websockets.serve(ws_server, port=8888)
+
+ await hci_transport.source.terminated
# -----------------------------------------------------------------------------
-logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
+logging.basicConfig(
+ level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper(),
+ format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s',
+ datefmt='%Y-%m-%d %H:%M:%S',
+)
asyncio.run(main())
diff --git a/tests/asha_test.py b/tests/asha_test.py
new file mode 100644
index 0000000..269e4a8
--- /dev/null
+++ b/tests/asha_test.py
@@ -0,0 +1,163 @@
+# Copyright 2021-2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import asyncio
+import pytest
+import struct
+from unittest import mock
+
+from bumble import device as bumble_device
+from bumble.profiles import asha
+
+from .test_utils import TwoDevices
+
+# -----------------------------------------------------------------------------
+HI_SYNC_ID = b'\x00\x01\x02\x03\x04\x05\x06\x07'
+TIMEOUT = 0.1
+
+
+# -----------------------------------------------------------------------------
[email protected]
+async def test_get_only_properties():
+ devices = TwoDevices()
+ await devices.setup_connection()
+
+ asha_service = asha.AshaService(
+ hisyncid=HI_SYNC_ID,
+ device=devices[0],
+ protocol_version=0x01,
+ capability=0x02,
+ feature_map=0x03,
+ render_delay_milliseconds=0x04,
+ supported_codecs=0x05,
+ )
+ devices[0].add_service(asha_service)
+
+ async with bumble_device.Peer(devices.connections[1]) as peer:
+ asha_client = peer.create_service_proxy(asha.AshaServiceProxy)
+ assert asha_client
+
+ read_only_properties = (
+ await asha_client.read_only_properties_characteristic.read_value()
+ )
+ (
+ protocol_version,
+ capabilities,
+ hi_sync_id,
+ feature_map,
+ render_delay_milliseconds,
+ _,
+ supported_codecs,
+ ) = struct.unpack("<BB8sBHHH", read_only_properties)
+ assert protocol_version == 0x01
+ assert capabilities == 0x02
+ assert hi_sync_id == HI_SYNC_ID
+ assert feature_map == 0x03
+ assert render_delay_milliseconds == 0x04
+ assert supported_codecs == 0x05
+
+
+# -----------------------------------------------------------------------------
[email protected]
+async def test_get_psm():
+ devices = TwoDevices()
+ await devices.setup_connection()
+
+ asha_service = asha.AshaService(
+ hisyncid=HI_SYNC_ID,
+ device=devices[0],
+ capability=0,
+ )
+ devices[0].add_service(asha_service)
+
+ async with bumble_device.Peer(devices.connections[1]) as peer:
+ asha_client = peer.create_service_proxy(asha.AshaServiceProxy)
+ assert asha_client
+
+ psm = (await asha_client.psm_characteristic.read_value())[0]
+ assert psm == asha_service.psm
+
+
+# -----------------------------------------------------------------------------
[email protected]
+async def test_write_audio_control_point_start():
+ devices = TwoDevices()
+ await devices.setup_connection()
+
+ asha_service = asha.AshaService(
+ hisyncid=HI_SYNC_ID,
+ device=devices[0],
+ capability=0,
+ )
+ devices[0].add_service(asha_service)
+
+ async with bumble_device.Peer(devices.connections[1]) as peer:
+ asha_client = peer.create_service_proxy(asha.AshaServiceProxy)
+ assert asha_client
+ status_notifications = asyncio.Queue()
+ await asha_client.audio_status_point_characteristic.subscribe(
+ status_notifications.put_nowait
+ )
+
+ start_cb = mock.MagicMock()
+ asha_service.on('started', start_cb)
+ await asha_client.audio_control_point_characteristic.write_value(
+ bytes(
+ [asha.OpCode.START, asha.Codec.G_722_16KHZ, asha.AudioType.MEDIA, 0, 1]
+ )
+ )
+ status = (await asyncio.wait_for(status_notifications.get(), TIMEOUT))[0]
+ assert status == asha.AudioStatus.OK
+
+ start_cb.assert_called_once()
+ assert asha_service.active_codec == asha.Codec.G_722_16KHZ
+ assert asha_service.volume == 0
+ assert asha_service.other_state == 1
+ assert asha_service.audio_type == asha.AudioType.MEDIA
+
+
+# -----------------------------------------------------------------------------
[email protected]
+async def test_write_audio_control_point_stop():
+ devices = TwoDevices()
+ await devices.setup_connection()
+
+ asha_service = asha.AshaService(
+ hisyncid=HI_SYNC_ID,
+ device=devices[0],
+ capability=0,
+ )
+ devices[0].add_service(asha_service)
+
+ async with bumble_device.Peer(devices.connections[1]) as peer:
+ asha_client = peer.create_service_proxy(asha.AshaServiceProxy)
+ assert asha_client
+ status_notifications = asyncio.Queue()
+ await asha_client.audio_status_point_characteristic.subscribe(
+ status_notifications.put_nowait
+ )
+
+ stop_cb = mock.MagicMock()
+ asha_service.on('stopped', stop_cb)
+ await asha_client.audio_control_point_characteristic.write_value(
+ bytes([asha.OpCode.STOP])
+ )
+ status = (await asyncio.wait_for(status_notifications.get(), TIMEOUT))[0]
+ assert status == asha.AudioStatus.OK
+
+ stop_cb.assert_called_once()
+ assert asha_service.active_codec is None
+ assert asha_service.volume is None
+ assert asha_service.other_state is None
+ assert asha_service.audio_type is None