blob: 0880e2579948df78992d4864ba34b132ef8583d1 [file] [log] [blame]
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Counterpart to the Python example `run_scanner.py`.
//!
//! Device deduplication is done here rather than relying on the controller's filtering to provide
//! for additional features, like the ability to make deduplication time-bounded.
use bumble::{
adv::CommonDataType,
wrapper::{
core::AdvertisementDataUnit,
device::Device,
hci::{packets::AddressType, Address},
transport::Transport,
},
};
use clap::Parser as _;
use itertools::Itertools;
use owo_colors::{OwoColorize, Style};
use pyo3::PyResult;
use std::{
collections,
sync::{Arc, Mutex},
time,
};
#[pyo3_asyncio::tokio::main]
async fn main() -> PyResult<()> {
env_logger::builder()
.filter_level(log::LevelFilter::Info)
.init();
let cli = Cli::parse();
let transport = Transport::open(cli.transport).await?;
let address = Address::new("F0:F1:F2:F3:F4:F5", AddressType::RandomDeviceAddress)?;
let mut device = Device::with_hci("Bumble", address, transport.source()?, transport.sink()?)?;
// in practice, devices can send multiple advertisements from the same address, so we keep
// track of a timestamp for each set of data
let seen_advertisements = Arc::new(Mutex::new(collections::HashMap::<
Vec<u8>,
collections::HashMap<Vec<AdvertisementDataUnit>, time::Instant>,
>::new()));
let seen_adv_clone = seen_advertisements.clone();
device.on_advertisement(move |_py, adv| {
let rssi = adv.rssi()?;
let data_units = adv.data()?.data_units()?;
let addr = adv.address()?;
let show_adv = if cli.filter_duplicates {
let addr_bytes = addr.as_le_bytes()?;
let mut seen_adv_cache = seen_adv_clone.lock().unwrap();
let expiry_duration = time::Duration::from_secs(cli.dedup_expiry_secs);
let advs_from_addr = seen_adv_cache.entry(addr_bytes).or_default();
// we expect cache hits to be the norm, so we do a separate lookup to avoid cloning
// on every lookup with entry()
let show = if let Some(prev) = advs_from_addr.get_mut(&data_units) {
let expired = prev.elapsed() > expiry_duration;
*prev = time::Instant::now();
expired
} else {
advs_from_addr.insert(data_units.clone(), time::Instant::now());
true
};
// clean out anything we haven't seen in a while
advs_from_addr.retain(|_, instant| instant.elapsed() <= expiry_duration);
show
} else {
true
};
if !show_adv {
return Ok(());
}
let addr_style = if adv.is_connectable()? {
Style::new().yellow()
} else {
Style::new().red()
};
let (type_style, qualifier) = match adv.address()?.address_type()? {
AddressType::PublicIdentityAddress | AddressType::PublicDeviceAddress => {
(Style::new().cyan(), "")
}
_ => {
if addr.is_static()? {
(Style::new().green(), "(static)")
} else if addr.is_resolvable()? {
(Style::new().magenta(), "(resolvable)")
} else {
(Style::new().default_color(), "")
}
}
};
println!(
">>> {} [{:?}] {qualifier}:\n RSSI: {}",
addr.as_hex()?.style(addr_style),
addr.address_type()?.style(type_style),
rssi,
);
data_units.into_iter().for_each(|(code, data)| {
let matching = CommonDataType::for_type_code(code).collect::<Vec<_>>();
let code_str = if matching.is_empty() {
format!("0x{}", hex::encode_upper([code.into()]))
} else {
matching
.iter()
.map(|t| format!("{}", t))
.join(" / ")
.blue()
.to_string()
};
// use the first matching type's formatted data, if any
let data_str = matching
.iter()
.filter_map(|t| {
t.format_data(&data).map(|formatted| {
format!(
"{} {}",
formatted,
format!("(raw: 0x{})", hex::encode_upper(&data)).dimmed()
)
})
})
.next()
.unwrap_or_else(|| format!("0x{}", hex::encode_upper(&data)));
println!(" [{}]: {}", code_str, data_str)
});
Ok(())
})?;
device.power_on().await?;
// do our own dedup
device.start_scanning(false).await?;
// wait until user kills the process
tokio::signal::ctrl_c().await?;
Ok(())
}
#[derive(clap::Parser)]
#[command(author, version, about, long_about = None)]
struct Cli {
/// Bumble transport spec.
///
/// <https://google.github.io/bumble/transports/index.html>
#[arg(long)]
transport: String,
/// Filter duplicate advertisements
#[arg(long, default_value_t = false)]
filter_duplicates: bool,
/// How long before a deduplicated advertisement that hasn't been seen in a while is considered
/// fresh again, in seconds
#[arg(long, default_value_t = 10, requires = "filter_duplicates")]
dedup_expiry_secs: u64,
}