blob: 41dcbf3475fef5ace200f3583c7f097fa88fc078 [file] [log] [blame]
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! HCI
use itertools::Itertools as _;
use pyo3::{
exceptions::PyException, intern, types::PyModule, FromPyObject, PyAny, PyErr, PyObject,
PyResult, Python, ToPyObject,
};
/// HCI error code.
pub struct HciErrorCode(u8);
impl<'source> FromPyObject<'source> for HciErrorCode {
fn extract(ob: &'source PyAny) -> PyResult<Self> {
Ok(HciErrorCode(ob.extract()?))
}
}
impl ToPyObject for HciErrorCode {
fn to_object(&self, py: Python<'_>) -> PyObject {
self.0.to_object(py)
}
}
/// Provides helpers for interacting with HCI
pub struct HciConstant;
impl HciConstant {
/// Human-readable error name
pub fn error_name(status: HciErrorCode) -> PyResult<String> {
Python::with_gil(|py| {
PyModule::import(py, intern!(py, "bumble.hci"))?
.getattr(intern!(py, "HCI_Constant"))?
.call_method1(intern!(py, "error_name"), (status.0,))?
.extract()
})
}
}
/// A Bluetooth address
pub struct Address(pub(crate) PyObject);
impl Address {
/// The type of address
pub fn address_type(&self) -> PyResult<AddressType> {
Python::with_gil(|py| {
let addr_type = self
.0
.getattr(py, intern!(py, "address_type"))?
.extract::<u32>(py)?;
let module = PyModule::import(py, intern!(py, "bumble.hci"))?;
let klass = module.getattr(intern!(py, "Address"))?;
if addr_type
== klass
.getattr(intern!(py, "PUBLIC_DEVICE_ADDRESS"))?
.extract::<u32>()?
{
Ok(AddressType::PublicDevice)
} else if addr_type
== klass
.getattr(intern!(py, "RANDOM_DEVICE_ADDRESS"))?
.extract::<u32>()?
{
Ok(AddressType::RandomDevice)
} else if addr_type
== klass
.getattr(intern!(py, "PUBLIC_IDENTITY_ADDRESS"))?
.extract::<u32>()?
{
Ok(AddressType::PublicIdentity)
} else if addr_type
== klass
.getattr(intern!(py, "RANDOM_IDENTITY_ADDRESS"))?
.extract::<u32>()?
{
Ok(AddressType::RandomIdentity)
} else {
Err(PyErr::new::<PyException, _>("Invalid address type"))
}
})
}
/// True if the address is static
pub fn is_static(&self) -> PyResult<bool> {
Python::with_gil(|py| {
self.0
.getattr(py, intern!(py, "is_static"))?
.extract::<bool>(py)
})
}
/// True if the address is resolvable
pub fn is_resolvable(&self) -> PyResult<bool> {
Python::with_gil(|py| {
self.0
.getattr(py, intern!(py, "is_resolvable"))?
.extract::<bool>(py)
})
}
/// Address bytes in _little-endian_ format
pub fn as_le_bytes(&self) -> PyResult<Vec<u8>> {
Python::with_gil(|py| {
self.0
.call_method0(py, intern!(py, "to_bytes"))?
.extract::<Vec<u8>>(py)
})
}
/// Address bytes as big-endian colon-separated hex
pub fn as_hex(&self) -> PyResult<String> {
self.as_le_bytes().map(|bytes| {
bytes
.into_iter()
.rev()
.map(|byte| hex::encode_upper([byte]))
.join(":")
})
}
}
/// BT address types
#[allow(missing_docs)]
#[derive(PartialEq, Eq, Debug)]
pub enum AddressType {
PublicDevice,
RandomDevice,
PublicIdentity,
RandomIdentity,
}