blob: a9f89c59d3f4b8649381d5ca6282ec9cb50c633a [file] [log] [blame]
# Copyright (C) 2024 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
import logging
import struct
from typing import Optional, Dict
from enum import IntEnum
from binascii import hexlify
import serial
from serial.tools.list_ports import comports
from mobly import logger as mobly_logger
from .tag import TypeATag, TypeBTag
from .nfcutils import ByteStruct, snake_to_camel, s_to_us
from .nfcutils.reader import Reader, CONFIGURATION_A_LONG
_LONG_PREAMBLE = bytes(20)
_ACK_FRAME = bytes.fromhex("0000ff00ff00")
_SOF = bytes.fromhex("0000ff")
_BITRATE = {106: 0b000, 212: 0b001, 424: 0b010, 848: 0b011}
# Framing values defined in PN532_C1, 8.6.23
_FRAMING = {"A": 0b00, "DEP": 0b01, "F": 0b10, "B": 0b11}
# Timeout values defined in UM0701-02, Table 17,
# from 100 µs (n=1) up to 3.28 sec (n=16)
_TIMEOUT = {n: 100 * 2 ** (n - 1) for n in range(0x01, 0x10)}
# Picked manually, might not be the best combinations
_POWER_LEVELS_TO_P_N_DRIVER_CONFIGS = {
# No frames should be detected
0: (0b000000, 0b0000),
# A, F detected with gain 1-3
20: (0b000001, 0b0001),
# A, F detected with gain 4-5
40: (0b000010, 0b0010),
# A, F detected with gain 5-6
60: (0b000011, 0b0100),
# A, F, detected with gain 7-8
80: (0b001000, 0b1000),
# A, B, F detected with gain 9
100: (0b111111, 0b1111)
}
class Command(IntEnum):
"""https://www.nxp.com/docs/en/user-guide/141520.pdf
UM0701-02
"""
DIAGNOSE = 0x00
GET_FIRMWARE_VERSION = 0x02
GET_GENERAL_STATUS = 0x04
READ_REGISTER = 0x06
WRITE_REGISTER = 0x08
SAM_CONFIGURATION = 0x14
POWER_DOWN = 0x16
RF_CONFIGURATION = 0x32
IN_JUMP_FOR_DEP = 0x56
IN_JUMP_FOR_PSL = 0x46
IN_LIST_PASSIVE_TARGET = 0x4A
IN_DATA_EXCHANGE = 0x40
IN_COMMUNICATE_THRU = 0x42
IN_DESELECT = 0x44
IN_RELEASE = 0x52
IN_SELECT = 0x54
IN_AUTO_POLL = 0x60
TG_INIT_AS_TARGET = 0x8C
TG_SET_GENERAL_BYTES = 0x92
TG_GET_DATA = 0x86
TG_SET_DATA = 0x8E
TG_SET_METADATA = 0x94
TG_GET_INITIATOR_COMMAND = 0x88
TG_RESPONSE_TO_INITIATOR = 0x90
TG_GET_TARGET_STATUS = 0x8A
_BS = ByteStruct.of
class Register(IntEnum):
"""https://www.nxp.com/docs/en/nxp/data-sheets/PN532_C1.pdf
PN532/C1
8.6.22 CIU memory map
8.7.1 Standard registers
"""
structure: ByteStruct
def __new__(cls, address: int, structure: Optional[ByteStruct] = None):
obj = int.__new__(cls, address)
obj._value_ = address
obj.structure = None
return obj
def __init__(self, _, structure: Optional[ByteStruct] = None):
# When initializing, we already know the name, so we're able to
# generate a nice name for matching ByteStruct
name = snake_to_camel(self.name.lower(), lower=False)
self.structure = ByteStruct.of(
name, **(structure.fields if structure else {"value": (7, 0)})
)
COMMAND = 0x6331
COMM_I_EN = 0x6332
DIV_I_EN = 0x6333
COMM_I_RQ = 0x6334
DIV_I_RQ = 0x6335
ERROR = 0x6336
WATER_LEVEL = 0x633B
# (8.6.23.14) Control
CONTROL = 0x633C, _BS(
t_stop_now=7,
t_start_now=6,
wr_nfcip1_id_to_fifo=5,
initiator=4,
rfu=3,
rx_last_bits=(2, 0),
)
# (8.6.23.15) BitFraming
BIT_FRAMING = 0x633D, _BS(
start_send=7, rx_align=(6, 4), rfu=3, tx_last_bits=(2, 0)
)
# (8.6.23.16) Coll
COLL = 0x633E, _BS(
values_after_coll=7,
rfu=6,
coll_pos_not_valid=5,
coll_pos=(4, 0),
)
# (8.6.23.17) Mode
MODE = 0x6301, _BS(
msb_first=7,
detect_sync=6,
tx_wait_rf=5,
rx_wait_rf=4,
pol_sigin=3,
mode_det_off=2,
crc_preset=(1, 0),
)
# (8.6.23.18) TxMode
TX_MODE = 0x6302, _BS(
crc_en=7, speed=(6, 4), inv_mod=3, mix=2, framing=(1, 0)
)
# (8.6.23.19) RxMode
RX_MODE = 0x6303, _BS(
crc_en=7, speed=(6, 4), no_err=3, multiple=2, framing=(1, 0)
)
# (8.6.23.20) TxControl
TX_CONTROL = 0x6304, _BS(
inv_tx2_rf_on=7,
inv_tx1_rf_on=6,
inv_tx2_rf_off=5,
inv_tx1_rf_off=4,
tx2_cw=3,
check_rf=2,
tx2_rf_en=1,
tx1_rf_en=0,
)
# (8.6.23.21) TxAuto
TX_AUTO = 0x6305, _BS(
auto_rf_off=7,
force_100_ask=6,
auto_wake_up=5,
rfu=4,
ca_on=3,
initial_rf_on=2,
tx2_rf_auto_en=1,
tx1_rf_auto_en=0,
)
TX_SEL = 0x6306
RX_SEL = 0x6307
# (8.6.23.24) RxThreshold
RX_THRESHOLD = 0x6308, _BS(min_level=(7, 4), rfu=3, col_level=(2, 0))
# (8.6.23.25) Demod
DEMOD = 0x6309, _BS(
add_iq=(7, 6), fix_iq=5, tau_rcv=(3, 2), tau_sync=(1, 0)
)
MANUAL_RCV = 0x630D
TYPE_B = 0x630E
# (8.6.23.33) GsNOff
GS_N_OFF = 0x6313, _BS(cw_gs_n_off=(7, 4), mod_gs_n_off=(3, 0))
# (8.6.23.34) ModWidth
MOD_WIDTH = 0x6314, _BS(mod_width=(7, 0))
# (8.6.23.35) TxBitPhase
TX_BIT_PHASE = 0x6315, _BS(rcv_clk_change=7, tx_bit_phase=(6, 0))
# (8.6.23.36) RfCfg
RF_CFG = 0x6316, _BS(rf_level_amp=7, rx_gain=(6, 4), rf_level=(3, 0))
# (8.6.23.37) GsNOn
GS_N_ON = 0x6317, _BS(cw_gs_n_on=(7, 4), mod_gs_n_on=(3, 0))
# (8.6.23.38) CWGsP
CW_GS_P = 0x6318, _BS(rfu=(7, 6), cw_gs_p=(5, 0))
# (8.6.23.39) ModGsP
MOD_GS_P = 0x6319, _BS(rfu=(7, 6), mod_gs_p=(5, 0))
REG = Register
REGISTER_VALUES_FOR_TRANSCEIVE = {
# The following registers are configured for transmit
# based on register states after using IN_LIST_PASSIVE_TARGET
REG.CONTROL: REG.CONTROL.structure(initiator=True),
REG.TX_CONTROL: REG.TX_CONTROL.structure(
inv_tx2_rf_on=True,
tx2_rf_en=True,
tx1_rf_en=True,
),
REG.RX_THRESHOLD: REG.RX_THRESHOLD.structure(
min_level=0b1000, col_level=0b101
),
REG.GS_N_OFF: REG.GS_N_OFF.structure(
cw_gs_n_off=0b0110, mod_gs_n_off=0b1111
),
}
class RFConfigItem(IntEnum):
"""https://www.nxp.com/docs/en/user-guide/141520.pdf
UM0701-02
7.3.1 RFConfiguration
"""
RF_FIELD = 0x01 # ConfigurationData
VARIOUS_TIMINGS = 0x02 # RFU, fATR_RES_Timeout, fRetryTimeout
# 0x03 RFU
MAX_RTY_COM = 0x04 # MaxRtyCOM
MAX_RETRIES = 0x05 # MxRtyATR, MxRtyPSL, MxRtyPassiveActivation
class BrTy(IntEnum):
"""https://www.nxp.com/docs/en/user-guide/141520.pdf
UM0701-02
7.3.5 InListPassiveTarget
"""
# InitiatorData is optional and may contain a UID to initialize
TYPE_A_106 = 0x00
# InitiatorData contains "Polling" command payload
TYPE_F_212 = 0x01
TYPE_F_424 = 0x02
# InitiatorData contains AFI and optional polling method byte
TYPE_B_106 = 0x03
# InitiatorData field unused
TYPE_A_JEWEL = 0x04
class Status(IntEnum):
"""https://www.nxp.com/docs/en/user-guide/141520.pdf
UM0701-02
7.1 Error Handling
"""
OK = 0x00
TIME_OUT = 0x01
CRC_ERROR = 0x02
PARITY_ERROR = 0x03
ERRONEOUS_BIT_COUNT = 0x04
MIFARE_FRAMING_ERROR = 0x05
BIT_COLLISION = 0x06
COMMUNICATION_BUFFER_SIZE_INSUFFICIENT = 0x07
RF_BUFFER_OVERFLOW = 0x09
RF_PROTOCOL_ERROR = 0x0B
TEMPERATURE_ERROR = 0x0D
INTERNAL_BUFFER_OVERFLOW = 0x0E
INVALID_PARAMETER = 0x10
class PN532(Reader):
"""Implements an NFC reader with a PN532 chip"""
def __init__(self, path):
"""Initializes device on path,
or first available serial port if none is provided."""
if len(comports()) == 0:
raise IndexError(
"Could not find device on serial port"
+ ", make sure reader is plugged in."
)
if len(path) == 0: # Iterate through available ports to locate PN532
found_port = False
for port in comports():
if self.verify_firmware_for_port(port.device):
found_port = True
break
if not found_port:
raise RuntimeError(
"Could not verify PN532 firmware on any available serial ports"
)
else:
if not self.verify_firmware_for_port(path):
raise RuntimeError(
"Could not verify PN532 firmware on serial path " + path
)
self.sam_configuration(mode=0x01, timeout_value=0x00)
self.write_long_preamble = False
# Disable retries
self.device.flushInput()
self.rf_configuration(
RFConfigItem.MAX_RETRIES,
[
0x00, # MxRtyATR
0x00, # MxRtyPSL
0x00, # MxRtyPassiveActivation
],
)
# Custom functions
def verify_firmware_for_port(self, path):
"""
Verifies that a PN532 reader is attached to the given serial port path.
:param path: Serial path
:return: True if a PN532 reader is located at the path; false otherwise.
"""
self.register_cache = {}
self.rf_configuration_cache = {}
self.write_long_preamble = True
self.log = mobly_logger.PrefixLoggerAdapter(
logging.getLogger(),
{
mobly_logger.PrefixLoggerAdapter.EXTRA_KEY_LOG_PREFIX: (
f"[PN532|{path}]"
)
},
)
self.log.debug("Serial port: %s", path)
self.device = serial.Serial(path, 115200, timeout=0.5)
self.device.flush()
self._send_ack_frame()
self.device.flushInput()
try:
return self.verify_firmware_version()
except Exception as e:
return False
def poll_a(self):
"""Attempts to detect target for NFC type A."""
self.log.debug("Polling A")
tag = self.in_list_passive_target(br_ty=BrTy.TYPE_A_106)
if tag:
self.log.debug(f"Got Type A tag, SEL_RES={tag.sel_res}")
return tag
def poll_b(self):
"""Attempts to detect target for NFC type B."""
self.log.debug("Polling B")
afi = 0x00
tag = self.in_list_passive_target(
br_ty=BrTy.TYPE_B_106, initiator_data=(afi,)
)
if tag:
self.log.debug(f"Got Type B tag {tag.sensb_res}")
return tag
def send_broadcast(
self,
data: bytes,
*,
configuration=CONFIGURATION_A_LONG
):
"""Emits a broadcast frame into the polling loop"""
self.log.debug("Sending broadcast %s", hexlify(data).decode())
return self.transceive_raw(
data=data,
type_=configuration.type,
crc=configuration.crc,
bits=configuration.bits,
bitrate=configuration.bitrate,
timeout=configuration.timeout or 0.25,
power_level=configuration.power,
)
def mute(self):
"""Turns off device's RF antenna."""
self.log.debug("Muting")
self.rf_configuration(RFConfigItem.RF_FIELD, [0b10])
def unmute(self, auto_rf_ca=False):
"""Turns on device's RF antenna."""
self.log.debug("Unmuting")
self.rf_configuration(RFConfigItem.RF_FIELD, [(auto_rf_ca << 1) + 0b01])
def reset(self):
"""Clears out input and output buffers to expunge leftover data"""
self.device.reset_input_buffer()
self.device.reset_output_buffer()
# Special commands
def transceive_raw(
self,
data,
type_="A",
crc=True,
bits=8,
bitrate=106,
*,
timeout=1,
power_level=100,
cache_configuration=True,
):
"""Configures the CIU with specified configuration and sends raw data
:param timeout: Timeout in seconds
:param cache_configuration: if true, prevents redundant writes & reads
"""
# Choose the least index of timeout duration
# where result >= given value. Timeout is in μs.
# If timeout value is too big, or <= 0,
# fall back to maximum timeout duration
timeout_index = next(
(idx for idx, t in _TIMEOUT.items() if t >= s_to_us(timeout)), 0x10
)
self.rf_configuration(
RFConfigItem.VARIOUS_TIMINGS,
[
0x00, # RFU
0x0B, # ATR_RES TimeOut, default value is 0x0B
timeout_index,
],
cache=cache_configuration,
)
p_n_config = next(
(
config
for power, config in _POWER_LEVELS_TO_P_N_DRIVER_CONFIGS.items()
if power >= power_level
),
_POWER_LEVELS_TO_P_N_DRIVER_CONFIGS[0]
)
p_driver_conductance_level, n_driver_conductance_level = p_n_config
(
tx_mode, rx_mode, tx_auto, bit_frm,
gs_n_on, cw_gs_p, md_gs_p,
rf_cfg, tx_bit_phase, demod,
) = self.read_registers(
REG.TX_MODE, REG.RX_MODE, REG.TX_AUTO, REG.BIT_FRAMING,
REG.GS_N_ON, REG.CW_GS_P, REG.MOD_GS_P,
REG.RF_CFG, REG.TX_BIT_PHASE, REG.DEMOD,
cache=cache_configuration,
)
# The following register modifications are based on register state
# modifications when performing IN_LIST_PASSIVE_TARGET and communication
registers_to_write = {
REG.TX_MODE: tx_mode.replace(
crc_en=crc, speed=_BITRATE[bitrate], framing=_FRAMING[type_]
),
REG.RX_MODE: rx_mode.replace(
crc_en=crc, speed=_BITRATE[bitrate], framing=_FRAMING[type_]
),
REG.TX_AUTO: tx_auto.replace(force_100_ask=type_ == "A"),
REG.BIT_FRAMING: bit_frm.replace(tx_last_bits=bits & 0b111),
REG.GS_N_ON: gs_n_on.replace(
mod_gs_n_on=0b0100 if type_ == "A" else 0b1111,
cw_gs_n_on=n_driver_conductance_level & 0b1111,
),
REG.CW_GS_P: cw_gs_p.replace(
cw_gs_p=p_driver_conductance_level & 0b111111
),
REG.MOD_GS_P: md_gs_p.replace(
mod_gs_p=0b010111 if type_ == "B" else 0b010001
),
REG.RF_CFG: rf_cfg.replace(
rx_gain=0b110 if type_ == "F" else 0b101,
rf_level=0b1001,
),
REG.TX_BIT_PHASE: tx_bit_phase.replace(
rcv_clk_change=1,
tx_bit_phase=0b1111 if type_ == "F" else 0b0111,
),
REG.DEMOD: demod.replace(
add_iq=0b01,
tau_rcv=0b00 if type_ == "F" else 0b11,
tau_sync=0b01,
),
**REGISTER_VALUES_FOR_TRANSCEIVE,
}
self.write_registers(registers_to_write, cache=cache_configuration)
# Handle a special case for FeliCa, where length byte has to be present
if type_ == "F":
data = [len(data) + 1, *data]
# No data is OK for this use case
return self.in_communicate_thru(data, raise_on_error_status=False)
def verify_firmware_version(self):
"""Verifies we are talking to a PN532."""
self.log.debug("Checking firmware version")
rsp = self.get_firmware_version()
return rsp[0] == 0x32
# PN532 defined commands
def initialize_target_mode(self):
"""Configures the PN532 as target."""
self.log.debug("Initializing target mode")
self._execute_command(
Command.TG_INIT_AS_TARGET,
[
0x05, # Mode
0x04, # SENS_RES (2 bytes)
0x00,
0x12, # nfcid1T (3 BYTES)
0x34,
0x56,
0x20, # SEL_RES
0x00, # FeliCAParams[] (18 bytes)
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00, # NFCID3T[] (10 bytes)
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00,
0x00, # LEN Gt
0x00, # LEN Tk
],
)
def sam_configuration(self, mode=0x01, timeout_value=0x00):
"""(7.2.10) SAMConfiguration"""
return self._execute_command(
Command.SAM_CONFIGURATION,
[mode, timeout_value],
timeout=1,
min_response=0,
)
def get_firmware_version(self):
"""(7.2.2) GetFirmwareVersion"""
return self._execute_command(
Command.GET_FIRMWARE_VERSION, min_response=4
)
def in_data_exchange(
self, tg, data, *, timeout=3, raise_on_error_status=True
):
"""(7.3.8) InDataExchange"""
rsp = self._execute_command(
Command.IN_DATA_EXCHANGE,
[tg, *data],
timeout=timeout,
)
if rsp is None or rsp[0] != Status.OK:
if raise_on_error_status:
raise RuntimeError(f"No response to {data}; {rsp}")
self.log.error("Got error exchanging data")
return None
return rsp[1:]
def in_communicate_thru(
self, data, *, timeout=1, raise_on_error_status=True
):
"""(7.3.9) InCommunicateThru"""
rsp = self._execute_command(
Command.IN_COMMUNICATE_THRU, data, min_response=1, timeout=timeout
)
if rsp[0] != Status.OK:
if raise_on_error_status:
raise RuntimeError(f"No response to {data}; {rsp}")
return None
return rsp[1:]
def in_list_passive_target(
self, br_ty: BrTy, initiator_data: bytes = b"", max_tg=1
):
"""(7.3.5) InListPassiveTarget
If max_tg=1, returns a tag or None if none was found,
Otherwise, returns a list
"""
# Reset cache values as IN_LIST_PASSIVE_TARGET modifies them
self.register_cache = {}
self.rf_configuration_cache = {}
rsp = self._execute_command(
Command.IN_LIST_PASSIVE_TARGET,
[max_tg, br_ty, *initiator_data],
min_response=1,
)
if rsp[0] == 0:
return [] if max_tg > 1 else None
data = rsp[1:]
tags = []
offset = 0
tag_for_brty = {
BrTy.TYPE_A_106: TypeATag,
BrTy.TYPE_B_106: TypeBTag
}
if br_ty not in tag_for_brty:
raise RuntimeError(f"BrTy {br_ty} not supported yet")
while offset <= len(data) - 1:
tag, offset = tag_for_brty[br_ty].from_target_data(
self, data[offset:]
)
tags.append(tag)
if max_tg == 1:
return tags[0]
return tags
def read_registers(self, *registers: Register, cache=False):
"""(7.2.4) ReadRegister:
Reads CIU registers
:param registers: an iterable containing addresses of registers to read
:param cache: prevents redundant register reads
"""
if cache and all(
Register(register) in self.register_cache for register in registers
):
return [self.register_cache[register] for register in registers]
data = b"".join(struct.pack(">H", register) for register in registers)
rsp = self._execute_command(Command.READ_REGISTER, data)
if not rsp:
raise RuntimeError(f"No response for read registers {registers}.")
return list(
register.structure(byte) for byte, register in zip(rsp, registers)
)
def write_registers(
self, registers: Dict[Register, int], cache=False
) -> None:
"""(7.2.5) WriteRegister:
Writes CIU registers
:param registers: dictionary containing key-value pairs
of register addresses and values to be written
:param cache: prevents redundant register writes
"""
# If not caching, assume all are different
difference = {
reg: val
for reg, val in registers.items()
if not cache or self.register_cache.get(reg) != val
}
if not difference:
return
data = b"".join(
struct.pack(">HB", reg, val) for reg, val in difference.items()
)
self._execute_command(Command.WRITE_REGISTER, data)
self.register_cache = {**self.register_cache, **registers}
def rf_configuration(
self, cfg_item: RFConfigItem, value: int, *, cache=False
):
"""(7.3.1) RFConfiguration
Applies settings to one of the available configuration items
:param cache: prevents redundant config writes
"""
if cache and self.rf_configuration_cache.get(cfg_item) == value:
return
self._execute_command(
Command.RF_CONFIGURATION, [cfg_item, *value], min_response=0
)
self.rf_configuration_cache[cfg_item] = value
# Internal communication commands
def _execute_command(
self, command: Command, data=b"", *, timeout=0.5, min_response=None
):
"""Executes the provided command
Verifies that response code matches the command code if response arrived
If min_response is set, checks if enough data was returned
"""
rsp = self._send_frame(
self._construct_frame([command, *data]), timeout=timeout
)
if not rsp:
if min_response is not None:
raise RuntimeError(f"No response for {command.name}; {rsp}")
return None
if rsp[0] != command + 1:
raise RuntimeError(
f"Response code {rsp[0]} does not match the command {command}"
)
del rsp[0]
if isinstance(min_response, int) and len(rsp) < min_response:
raise RuntimeError(
f"Got unexpected response for {command.name}"
+ f"; Length mismatch {len(rsp)} < {min_response}"
+ f"; {bytes(rsp).hex()}"
)
return rsp
# Protocol communication methods
def _construct_frame(self, data):
"""Construct a data fram to be sent to the PN532."""
# Preamble, start code, length, length checksum, TFI
frame = [
0x00,
0x00,
0xFF,
(len(data) + 1) & 0xFF,
((~(len(data) + 1) & 0xFF) + 0x01) & 0xFF,
0xD4,
]
data_sum = 0xD4
# Add data to frame
for b in data:
data_sum += b
frame.append(b)
frame.append(((~data_sum & 0xFF) + 0x01) & 0xFF) # Data checksum
frame.append(0x00) # Postamble
self.log.debug(
"Constructed frame " + hexlify(bytearray(frame)).decode()
)
return bytearray(frame)
def _write(self, frame):
"""Performs serial writes
while handling config for sending long preambles"""
if self.write_long_preamble:
frame = _LONG_PREAMBLE + frame
self.device.write(frame)
def _send_frame(self, frame, timeout=0.5):
"""Writes a frame to the device and returns the response."""
self._write(frame)
return self._get_device_response(timeout)
def _send_ack_frame(self, timeout=0.5):
"""Send ACK frame, there is no response."""
self.device.timeout = timeout
self._write(_ACK_FRAME)
def _get_device_response(self, timeout=0.5):
"""onfirms we get an ACK frame from device.
Reads response frame, and writes ACK.
"""
self.device.timeout = timeout
frame = bytearray(self.device.read(6))
if (len(frame)) == 0:
self.log.error("Did not get response from PN532")
return None
if bytes(frame) != _ACK_FRAME:
self.log.error(
"Did not get ACK frame, got %s", hexlify(frame).decode()
)
frame = bytearray(self.device.read(6))
if (len(frame)) == 0:
return None
if bytes(frame[0:3]) != _SOF:
self.log.error(
"Unexpected start to frame, got %s",
hexlify(frame[0:3]).decode(),
)
data_len = frame[3]
length_checksum = frame[4]
if (length_checksum + data_len) & 0xFF != 0:
self.log.error("Frame failed length checksum")
return None
tfi = frame[5]
if tfi != 0xD5:
self.log.error(
"Unexpected TFI byte when performing read, got %02x", frame[5]
)
return None
data_packet = bytearray(
self.device.read(data_len - 1)
) # subtract one since length includes TFI byte.
data_checksum = bytearray(self.device.read(1))[0]
if (tfi + sum(data_packet) + data_checksum) & 0xFF != 0:
self.log.error("Frame failed data checksum")
postamble = bytearray(self.device.read(1))[0]
if postamble != 0x00:
if tfi != 0xD5:
self.log.error(
"Unexpected postamble byte when performing read, got %02x",
frame[4],
)
self._send_ack_frame()
self.log.debug(
"Received frame %s%s",
hexlify(frame).decode(),
hexlify(data_packet).decode(),
)
return data_packet