blob: 31acd501be3eda6b3458886e5747829d6bda7d46 [file] [log] [blame]
// Copyright 2021, The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! NCI Protocol Abstraction Layer
//! Supports sending NCI commands to the HAL and receiving
//! NCI messages back
use bytes::{BufMut, BytesMut};
use log::{debug, error};
use nfc_hal::{Hal, HalEventRegistry};
use nfc_packets::nci::DataPacketChild::Payload;
use nfc_packets::nci::NciPacketChild;
use nfc_packets::nci::NotificationChild::ConnCreditsNotification;
use nfc_packets::nci::{Command, DataPacket, DataPacketBuilder, Notification};
use nfc_packets::nci::{Opcode, PacketBoundaryFlag, Response};
use pdl_runtime::Packet;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use tokio::select;
use tokio::sync::mpsc::{channel, Receiver, Sender, UnboundedSender};
use tokio::sync::{oneshot, RwLock};
use tokio::time::{sleep, Duration, Instant};
pub mod api;
/// Result type
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>;
/// Initialize the module and connect the channels
pub async fn init() -> Nci {
let hc = nfc_hal::init().await;
// Channel to handle data upstream messages
// let (in_data_int, in_data_ext) = channel::<DataPacket>(10);
// Internal data channels
// let ic = InternalChannels { in_data_int };
let (cmd_tx, cmd_rx) = channel::<QueuedCommand>(10);
let commands = CommandSender { cmd_tx };
let hal_events = hc.hal_events.clone();
let notifications = EventRegistry { handlers: Arc::new(Mutex::new(HashMap::new())) };
let connections = LogicalConnectionsRegistry {
conns: Arc::new(RwLock::new(HashMap::new())),
sender: hc.out_data_tx.clone(),
};
tokio::spawn(dispatch(notifications, connections.clone(), hc, cmd_rx));
Nci { hal_events, commands, connections }
}
/// NCI module external interface
pub struct Nci {
/// HAL events
pub hal_events: HalEventRegistry,
/// NCI command communication interface
pub commands: CommandSender,
/// NCI logical connections
pub connections: LogicalConnectionsRegistry,
}
#[derive(Debug)]
struct PendingCommand {
cmd: Command,
response: oneshot::Sender<Response>,
}
#[derive(Debug)]
struct QueuedCommand {
pending: PendingCommand,
notification: Option<oneshot::Sender<Notification>>,
}
/// Sends raw commands. Only useful for facades & shims, or wrapped as a CommandSender.
pub struct CommandSender {
cmd_tx: Sender<QueuedCommand>,
}
/// The data returned by send_notify() method.
pub struct ResponsePendingNotification {
/// Command response
pub response: Response,
/// Pending notification receiver
pub notification: oneshot::Receiver<Notification>,
}
impl CommandSender {
/// Send a command, but do not expect notification to be returned
pub async fn send(&mut self, cmd: Command) -> Result<Response> {
let (tx, rx) = oneshot::channel::<Response>();
self.cmd_tx
.send(QueuedCommand {
pending: PendingCommand { cmd, response: tx },
notification: None,
})
.await?;
let event = rx.await?;
Ok(event)
}
/// Send a command which expects notification as a result
pub async fn send_and_notify(&mut self, cmd: Command) -> Result<ResponsePendingNotification> {
let (tx, rx) = oneshot::channel::<Response>();
let (ntx, nrx) = oneshot::channel::<Notification>();
self.cmd_tx
.send(QueuedCommand {
pending: PendingCommand { cmd, response: tx },
notification: Some(ntx),
})
.await?;
let event = rx.await?;
Ok(ResponsePendingNotification { response: event, notification: nrx })
}
}
impl Drop for CommandSender {
fn drop(&mut self) {
debug!("CommandSender is dropped");
}
}
/// Parameters of a logical connection
struct ConnectionParameters {
callback: Option<fn(u8, u16, &[u8])>,
max_payload_size: u8,
nfcc_credits_avail: u8,
sendq: VecDeque<DataPacket>,
recvq: VecDeque<DataPacket>,
}
impl ConnectionParameters {
/// Flush TX queue
fn flush_tx(&mut self) {
self.sendq.clear();
}
}
/// To keep track of currentry open logical connections
#[derive(Clone)]
pub struct LogicalConnectionsRegistry {
conns: Arc<RwLock<HashMap<u8, Mutex<ConnectionParameters>>>>,
sender: UnboundedSender<DataPacket>,
}
impl LogicalConnectionsRegistry {
/// Create a logical connection
pub async fn open(
&mut self,
conn_id: u8,
cb: Option<fn(u8, u16, &[u8])>,
max_payload_size: u8,
nfcc_credits_avail: u8,
) {
let conn_params = ConnectionParameters {
callback: cb,
max_payload_size,
nfcc_credits_avail,
sendq: VecDeque::<DataPacket>::new(),
recvq: VecDeque::<DataPacket>::new(),
};
assert!(
self.conns.write().await.insert(conn_id, Mutex::new(conn_params)).is_none(),
"A logical connection with id {:?} already exists",
conn_id
);
}
/// Set static callback
pub async fn set_static_callback(&mut self, conn_id: u8, cb: Option<fn(u8, u16, &[u8])>) {
if conn_id < 2 && cb.is_some() {
// Static connections
if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
let mut conn_params = conn_params.lock().unwrap();
conn_params.callback = cb;
}
}
}
/// Close a logical connection
pub async fn close(&mut self, conn_id: u8) -> Option<fn(u8, u16, &[u8])> {
if let Some(conn_params) = self.conns.write().await.remove(&conn_id) {
conn_params.lock().unwrap().callback
} else {
None
}
}
/// Add credits to a logical connection
pub async fn add_credits(&self, conn_id: u8, ncreds: u8) {
if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
let mut conn_params = conn_params.lock().unwrap();
conn_params.nfcc_credits_avail += ncreds;
while !conn_params.sendq.is_empty() && conn_params.nfcc_credits_avail > 0 {
self.sender.send(conn_params.sendq.pop_front().unwrap()).unwrap();
conn_params.nfcc_credits_avail -= 1;
}
}
}
/// Send a packet to a logical channel, splitting it if needed
pub async fn send_packet(&mut self, conn_id: u8, pkt: DataPacket) {
if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
let mut conn_params = conn_params.lock().unwrap();
if let Payload(mut p) = pkt.specialize() {
if p.len() > conn_params.max_payload_size.into() {
let conn_id = pkt.get_conn_id();
while p.len() > conn_params.max_payload_size.into() {
let part = DataPacketBuilder {
conn_id,
pbf: PacketBoundaryFlag::Incomplete,
cr: 0,
payload: Some(p.split_to(conn_params.max_payload_size.into())),
}
.build();
conn_params.sendq.push_back(part);
}
if !p.is_empty() {
let end = DataPacketBuilder {
conn_id,
pbf: PacketBoundaryFlag::CompleteOrFinal,
cr: 0,
payload: Some(p),
}
.build();
conn_params.sendq.push_back(end);
}
} else {
conn_params.sendq.push_back(pkt);
}
}
while conn_params.nfcc_credits_avail > 0 && !conn_params.sendq.is_empty() {
self.sender.send(conn_params.sendq.pop_front().unwrap()).unwrap();
conn_params.nfcc_credits_avail -= 1;
}
}
}
/// Send data packet callback to the upper layers
pub async fn send_callback(&self, pkt: DataPacket) {
let conn_id = pkt.get_conn_id();
let ncreds = pkt.get_cr();
if ncreds > 0 {
self.add_credits(conn_id, ncreds).await;
}
let done = pkt.get_pbf() == PacketBoundaryFlag::CompleteOrFinal;
if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
let mut conn_params = conn_params.lock().unwrap();
if !done && conn_params.recvq.is_empty() {
const NFC_DATA_START_CEVT: u16 = 5;
let cb = conn_params.callback.unwrap();
cb(conn_id, NFC_DATA_START_CEVT, &[]);
}
conn_params.recvq.push_back(pkt);
if done {
const NFC_DATA_CEVT_SIZE: usize = 4; // 3 for header and 1 for status
let cap = conn_params.recvq.len() * conn_params.max_payload_size as usize
+ NFC_DATA_CEVT_SIZE;
let mut buffer = BytesMut::with_capacity(cap);
buffer.put_u8(0u8); // status
let pkt = conn_params.recvq.pop_front().unwrap();
buffer.put(pkt.encode_to_bytes().unwrap());
while !conn_params.recvq.is_empty() {
let pkt = conn_params.recvq.pop_front().unwrap();
if let Payload(p) = pkt.specialize() {
buffer.put(p);
}
}
let data_cevt = buffer.freeze();
let cb = conn_params.callback.unwrap();
const NFC_DATA_CEVT: u16 = 3;
cb(conn_id, NFC_DATA_CEVT, data_cevt.as_ref());
}
}
}
/// Flush outgoing data queue
pub async fn flush_data(&mut self, conn_id: u8) -> bool {
if let Some(conn_params) = self.conns.read().await.get(&conn_id) {
conn_params.lock().unwrap().flush_tx();
true
} else {
false
}
}
}
/// Provides ability to register and unregister for NCI notifications
#[derive(Clone)]
pub struct EventRegistry {
handlers: Arc<Mutex<HashMap<Opcode, oneshot::Sender<Notification>>>>,
}
impl EventRegistry {
/// Indicate interest in specific NCI notification
pub async fn register(&mut self, code: Opcode, sender: oneshot::Sender<Notification>) {
assert!(
self.handlers.lock().unwrap().insert(code, sender).is_none(),
"A handler for {:?} is already registered",
code
);
}
/// Remove interest in specific NCI notification
pub async fn unregister(&mut self, code: Opcode) -> Option<oneshot::Sender<Notification>> {
self.handlers.lock().unwrap().remove(&code)
}
}
async fn dispatch(
mut ntfs: EventRegistry,
lcons: LogicalConnectionsRegistry,
mut hc: Hal,
// ic: InternalChannels,
mut cmd_rx: Receiver<QueuedCommand>,
) -> Result<()> {
let mut pending: Option<PendingCommand> = None;
let timeout = sleep(Duration::MAX);
// The max_deadline is used to set the sleep() deadline to a very distant moment in
// the future, when the notification from the timer is not required.
let max_deadline = timeout.deadline();
tokio::pin!(timeout);
loop {
select! {
Some(cmd) = hc.in_cmd_rx.recv() => {
match cmd.specialize() {
NciPacketChild::Response(rsp) => {
timeout.as_mut().reset(max_deadline);
let this_opcode = rsp.get_cmd_op();
match pending.take() {
Some(PendingCommand{cmd, response}) if cmd.get_op() == this_opcode => {
if let Err(e) = response.send(rsp) {
error!("failure dispatching command status {:?}", e);
}
},
Some(PendingCommand{cmd, ..}) => panic!("Waiting for {:?}, got {:?}", cmd.get_op(), this_opcode),
None => panic!("Unexpected status event with opcode {:?}", this_opcode),
}
},
NciPacketChild::Notification(ntfy) => {
match ntfy.specialize() {
ConnCreditsNotification(ccnp) => {
let conns = ccnp.get_conns();
for conn in conns {
lcons.add_credits(conn.conn_id, conn.ncredits).await;
}
},
_ => {
let code = ntfy.get_cmd_op();
match ntfs.unregister(code).await {
Some(sender) => {
if let Err(e) = sender.send(ntfy) {
error!("notification channel closed {:?}", e);
}
},
None => panic!("Unhandled notification {:?}", code),
}
},
}
},
_ => error!("Unexpected NCI data received {:?}", cmd),
}
},
qc = cmd_rx.recv(), if pending.is_none() => if let Some(queued) = qc {
debug!("cmd_rx got a q");
if let Some(nsender) = queued.notification {
ntfs.register(queued.pending.cmd.get_op(), nsender).await;
}
if let Err(e) = hc.out_cmd_tx.send(queued.pending.cmd.clone().into()) {
error!("command queue closed: {:?}", e);
}
timeout.as_mut().reset(Instant::now() + Duration::from_millis(20));
pending = Some(queued.pending);
} else {
break;
},
() = &mut timeout => {
error!("Command processing timeout");
timeout.as_mut().reset(max_deadline);
pending = None;
},
Some(data) = hc.in_data_rx.recv() => lcons.send_callback(data).await,
else => {
debug!("Select is done");
break;
},
}
}
debug!("NCI dispatch is terminated.");
Ok(())
}