blob: b299657fd1fb16c3c091170cf182ab1752b3d59a [file] [log] [blame]
# Copyright (C) 2024 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
"""Parses the snoop log to extract polling loop data and APDU exchanges."""
import base64
import dataclasses
import datetime
import enum
import os
import zlib
PREAMBLE_LENGTH = 9
HEADER_LENGTH = 7
SNOOP_LOG_START = "BEGIN:NFCSNOOP_"
SNOOP_LOG_END = "END:NFCSNOOP_"
# Bytes identifying the starts of polling loop and APDU transactions
POLLING_LOOP_START_BYTES = bytes.fromhex("6f0c")
APDU_START_BYTES = bytes.fromhex("6f02")
# Index corresponding to the total length of the APDU data packet
APDU_LEN_INDEX = 2
# Size of the main APDU header, which precedes either a list of APDU exchanges
# or a single APDU exchange
APDU_MAIN_HEADER_SIZE = 5
# Start of the APDU data, which follows the main header
APDU_DATA_START_INDEX = 6
# Minimum lengths of a valid APDU command and response
APDU_COMMAND_MIN_LENGTH = 13
APDU_RESPONSE_MIN_LENGTH = 10
# Sequence of bytes that identifies an APDU transaction
APDU_IDENTIFIER = bytes([0x20, 0x00])
# Byte that identifies an APDU command
APDU_COMMAND_IDENTIFIER = 0x19
# Byte that identifies an APDU response
APDU_RESPONSE_IDENTIFIER = 0x08
# Sequence of bytes that identifies whether the APDU command or response was
# the first command or response in a longer list of APDU commands or responses
APDU_ORDER_FIRST = 0x02
APDU_ORDER_FIRST_ALT = bytes([0x0A, 0x00])
APDU_ORDER_SECOND = 0x03
APDU_ORDER_SECOND_ALT = bytes([0x0B, 0x00])
# Sequence of bytes that identifies the start of a "SELECT AID" APDU command
AID_START_BYTES = bytes.fromhex("00A40400")
# AID groups that are used by the emulator app
SELECT_AID_FIRST = bytes.fromhex("00A4040008A000000151000000")
SELECT_AID_SECOND = bytes.fromhex("00A4040008A000000003000000")
class NfcType(enum.Enum):
NFC_A = 0
NFC_B = 1
NFC_F = 2
NFC_V = 3
REMOTE_FIELD = 4
UNKNOWN = 5
@dataclasses.dataclass
class PollingLoopEntry:
ts: int = 0
type: NfcType = NfcType.UNKNOWN
data: bytes = b""
error: str | None = None
@dataclasses.dataclass
class PartialApduEntry:
ts: int = 0
is_command: bool = False
data: bytes = b"" # data sent by the APDU command or response
is_first: bool = (
True
# whether this is the first entry in the list of APDU commands or responses sent together
)
@dataclasses.dataclass
class FullApduEntry:
ts: int = 0
command: list[bytes] = dataclasses.field(default_factory=lambda: [])
response: list[bytes] = dataclasses.field(default_factory=lambda: [])
error: str | None = None
def replace_aids(
log: list[PollingLoopEntry | FullApduEntry],
) -> list[PollingLoopEntry | FullApduEntry]:
"""Replaces the AIDs in the log with the AIDs that are used by the emulator app."""
new_log: list[PollingLoopEntry | FullApduEntry] = []
for cur in log:
is_first_aid = True
if isinstance(cur, FullApduEntry):
new_apdu_entry = FullApduEntry(
ts=cur.ts, command=[], response=cur.response
)
for cmd in cur.command:
if cmd.startswith(AID_START_BYTES):
if is_first_aid:
new_apdu_entry.command.append(SELECT_AID_FIRST)
is_first_aid = False
else:
new_apdu_entry.command.append(SELECT_AID_SECOND)
else:
new_apdu_entry.command.append(cmd)
new_log.append(new_apdu_entry)
else:
new_log.append(cur)
return new_log
def parse_timeframe(log, start, end):
"""Returns a subset of the log that falls within the given timeframe."""
if start is None and end is None:
return log
parsed_log = log
if start is not None:
start_dt = datetime.datetime.strptime(start, "%Y-%m-%d %H:%M:%S.%f")
start_ts = int(float(datetime.datetime.timestamp(start_dt)) * 1000000)
parsed_log = list(filter(lambda x: x.ts >= start_ts, log))
if end is not None:
end_dt = datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S.%f")
end_ts = int(float(datetime.datetime.timestamp(end_dt)) * 1000000)
parsed_log = list(filter(lambda x: x.ts <= end_ts, parsed_log))
return parsed_log
def standardize_log(
log: list[PollingLoopEntry | PartialApduEntry],
) -> list[PollingLoopEntry | FullApduEntry]:
"""Standardizes the log to ensure that it can be replayed by the PN 532 module.
This includes removing redundant calls to polling loop A and combining APDU
commands and responses into a single entry.
Args:
log: The log to be standardized.
Returns:
The standardized log.
"""
cmds = []
rsps = []
last_ts = 0
standardized: list[PollingLoopEntry | FullApduEntry] = []
for cur in log:
if isinstance(cur, PollingLoopEntry):
if cur.type == NfcType.NFC_A or cur.type == NfcType.NFC_B:
standardized.append(cur)
elif cur.type == NfcType.UNKNOWN:
if not standardized:
standardized.append(cur)
else:
standardized[-1] = cur
elif isinstance(cur, PartialApduEntry):
if cur.is_command:
if len(cmds) == len(rsps) + 1: # extra command without response
rsps.append(b"")
if len(cmds) == len(rsps) != 0 and cur.data.startswith(AID_START_BYTES):
standardized.append(FullApduEntry(last_ts, cmds, rsps))
cmds = []
rsps = []
cmds.append(cur.data)
else:
if len(cmds) == len(rsps): # extra response without command
continue
rsps.append(cur.data)
last_ts = cur.ts
# handle last command and response
if len(cmds) == len(rsps) + 1:
rsps.append(b"")
if len(cmds) == len(rsps) != 0:
standardized.append(FullApduEntry(last_ts, cmds, rsps))
return standardized
def parse_file(data: bytes) -> list[PollingLoopEntry | PartialApduEntry]:
"""Parses the file to extract polling loop data and APDU exchanges."""
if not data:
raise RuntimeError("No data found in file")
version = data[0]
if version != 1:
raise RuntimeError("Unsupported version: {}".format(version))
offset = PREAMBLE_LENGTH
header_length = HEADER_LENGTH
pts_offset = 2
polling_list = []
ts = calculate_timestamp(data)
while len(data) - offset > header_length:
# length of the current transaction in bytes
length = data[offset] + (data[offset + 1] << 8)
# duration between the last transaction and the current one
pts = bytearray(data[offset + pts_offset : offset + pts_offset + 4])
pts_real = pts[0] + (pts[1] << 8) + (pts[2] << 16) + (pts[3] << 24)
ts += pts_real
transaction_type = (data[offset + header_length] & 0xE0) >> 5
if transaction_type == 3: # ST_NTF or NCI_NTF transactions
cur_data = data[offset + header_length : offset + header_length + length]
if cur_data.startswith(POLLING_LOOP_START_BYTES):
polling_list.extend(add_polling_data(cur_data, ts))
elif cur_data.startswith(APDU_START_BYTES):
apdu_transactions = find_apdu_transactions(cur_data, ts)
polling_list.extend(apdu_transactions)
offset += header_length + length
return polling_list
def open_and_parse_file(
file_path: str,
) -> list[PollingLoopEntry | FullApduEntry]:
"""Opens the file that contains the unparsed snoop log and parses it.
Args:
file_path: The path of the file containing the unparsed snoop log.
Returns:
A list of polling loop entries and APDU exchanges parsed from the file.
Raises:
RuntimeError: If the file cannot be found.
"""
snoop_file = open_read_file(file_path)
str_data = ""
found_log = False
while line := snoop_file.readline():
if not found_log and SNOOP_LOG_START in line:
found_log = True
elif found_log:
if SNOOP_LOG_END in line:
break
str_data += line
snoop_bytes = inflate(base64.b64decode(str_data))
parsed = parse_file(snoop_bytes)
return standardize_log(parsed)
def find_apdu_transactions(data: bytes, ts: int) -> list[PartialApduEntry]:
"""Finds all APDU transactions in the given data."""
total_size = data[APDU_LEN_INDEX]
if total_size < APDU_MAIN_HEADER_SIZE or data[4:6] != APDU_IDENTIFIER:
return []
apdus: list[PartialApduEntry] = []
index = APDU_DATA_START_INDEX
while index < len(data):
cur_size = data[index + 1]
cur_data = data[index : index + cur_size + 2]
cmd, is_first = parse_apdu_command(cur_data) or (None, None)
if cmd is not None:
apdus.append(
PartialApduEntry(ts=ts, is_command=True, data=cmd, is_first=is_first)
)
else:
rsp, is_first = parse_apdu_response(cur_data) or (None, None)
if rsp is not None:
apdus.append(
PartialApduEntry(
ts=ts, is_command=False, data=rsp, is_first=is_first
)
)
index += cur_size + 2
return apdus
def parse_apdu_command(data: bytes):
"""Isolate the bytes sent from the reader to the emulator.
Args:
data: The raw APDU command in bytes.
Returns:
the data sent by the APDU command, or none if it is not a valid APDU
command.
"""
if len(data) < APDU_COMMAND_MIN_LENGTH:
return None
if data[0] != APDU_COMMAND_IDENTIFIER:
return None
if data[1] != len(data) - 2:
return None
if data[5:7] != bytes.fromhex("0000"):
return None
if data[8] in [APDU_ORDER_FIRST, APDU_ORDER_SECOND]:
is_first = True if data[8] == APDU_ORDER_FIRST else False
return data[9:-4], is_first
elif data[8:10] in [APDU_ORDER_FIRST_ALT, APDU_ORDER_SECOND_ALT]:
is_first = True if data[8:10] == APDU_ORDER_FIRST_ALT else False
return data[10:-4], is_first
return None
def parse_apdu_response(data: bytes):
"""Isolates the data sent from the emulator to the reader.
Args:
data: The raw APDU response in bytes.
Returns:
the data sent by the APDU response, or none if it is not a valid APDU
response.
"""
if len(data) < APDU_RESPONSE_MIN_LENGTH:
return None
if data[0] != APDU_RESPONSE_IDENTIFIER:
return None
if data[1] != len(data) - 2:
return None
if data[5] in [APDU_ORDER_FIRST, APDU_ORDER_SECOND]:
is_first = True if data[5] == APDU_ORDER_FIRST else False
return data[6:-4], is_first
elif data[5:7] in [APDU_ORDER_FIRST_ALT, APDU_ORDER_SECOND_ALT]:
is_first = True if data[5:7] == APDU_ORDER_FIRST_ALT else False
return data[7:-4], is_first
elif data[7] in [APDU_ORDER_FIRST, APDU_ORDER_SECOND]:
is_first = True if data[7] == APDU_ORDER_FIRST else False
return data[8:-4], is_first
return None
def add_polling_data(data: bytes, ts: int) -> list[PollingLoopEntry]:
"""Adds polling data to the list of transactions.
Each entry may contain multiple polling data transactions.
Args:
data: The raw polling data in bytes.
ts: The timestamp of the polling transaction.
Returns:
A list of polling data transactions.
"""
transaction_list = []
count = 4
while count < len(data):
flag = data[count]
match flag:
case 0:
entry_type = NfcType.REMOTE_FIELD
case 1:
entry_type = NfcType.NFC_A
case 2:
entry_type = NfcType.NFC_B
case _:
entry_type = NfcType.UNKNOWN
length = data[count + 2] - 5
polling_data = data[count + 8 : count + 8 + length]
transaction_list.append(
PollingLoopEntry(
ts=ts,
type=entry_type,
data=polling_data,
)
)
count += 8 + length
return transaction_list
def calculate_timestamp(data: bytes) -> int:
"""Calculates the timestamp of the first frame in the log."""
ts = data[1:9]
ts_real = (
ts[0]
+ (ts[1] << 8)
+ (ts[2] << 16)
+ (ts[3] << 24)
+ (ts[4] << 32)
+ (ts[5] << 40)
+ (ts[6] << 48)
+ (ts[7] << 56)
)
offset = PREAMBLE_LENGTH
while (len(data) - offset) > HEADER_LENGTH:
length = data[offset] + (data[offset + 1] << 8)
pts = bytearray(data[offset + 2 : offset + 6])
pts_real = pts[0] + (pts[1] << 8) + (pts[2] << 16) + (pts[3] << 24)
ts_real -= pts_real
offset += HEADER_LENGTH + length
return ts_real
def inflate(data: bytes) -> bytes:
"""Inflates decompressed data."""
decompressed = zlib.decompressobj().decompress(data[PREAMBLE_LENGTH:])
return data[0:PREAMBLE_LENGTH] + decompressed
def open_read_file(file_path: str):
"""Opens the file at the given path.
Args:
file_path: The path of the file to be opened. This can be either a local
path or an absolute path.
Returns:
An object representing the opened file.
Raises:
RuntimeError: If the file cannot be opened.
"""
full_path = os.path.dirname(os.path.realpath(__file__)) + "/" + file_path
if os.path.exists(file_path):
file_to_open = file_path
elif os.path.exists(full_path):
file_to_open = full_path
else:
raise RuntimeError("File not found: {}".format(file_path))
try:
return open(file_to_open, "rt")
except Exception as e:
raise RuntimeError(
"Error occurred while opening file: {}".format(file_path)
) from e