blob: 3fb55fcade270767173d15b0a74309f344d5a3cf [file] [log] [blame]
# Copyright (C) 2024 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
import argparse
import datetime
import os
import subprocess
import time
from generate_test import generate_test
from parse_log import FullApduEntry, NfcType, PollingLoopEntry, open_and_parse_file, parse_timeframe, replace_aids
from pn532 import PN532
# Minimum amount of time between successive NFC transactions, to prevent the
# reader from being overloaded.
_MIN_SLEEP_TIME_SECONDS = 0.5
# Maximum amount of time allowed between successive NFC transactions, to allow
# for ease of use.
_MAX_SLEEP_TIME_SECONDS = 5
_EXPECTED_ERROR_INDEX = 0
_ACTUAL_ERROR_INDEX = 1
# Number of times to try to obtain a tag before declaring failure.
_NUM_RETRIES = 50
# String templates for the output of a test case or snoop log.
_APDU_OUTPUT_STR = "{}: APDU exchange: sent {}, received {}"
_APDU_OUTPUT_TEST_STR = "{}: APDU exchange: sent {} APDUs, received {} APDUs"
_POLLING_OUTPUT_STR = "{}: sent NFC data of type {}"
_ERROR_STR = " ERROR: {}"
# Width of the column for outputting the results of a test case.
_COLUMN_WIDTH = 80
# Directory for generated test cases and files for the emulator app.
_EMULATOR_APP_PARSED_DIR = "src/com/android/nfc/emulatorapp/parsed_files/"
def send_nfc_a_data(reader: PN532) -> str | None:
"""Calls poll_a() on the reader.
Args:
reader: The PN532 reader.
Returns:
the error, if one occurs
"""
try:
reader.poll_a()
except Exception as e:
return e.__str__()
def send_nfc_b_data(reader: PN532) -> str | None:
"""Calls poll_b() on the reader.
Args:
reader: The PN532 reader.
Returns:
the error, if one occurs
"""
try:
reader.poll_b()
except Exception as e:
return e.__str__()
def send_unknown_data(reader: PN532, data: bytes) -> str | None:
"""Sends a custom polling frame to the reader.
Args:
reader: The PN532 reader.
data: The custom polling frame to be sent to the reader.
Returns:
the error, if one occurs
"""
try:
reader.poll_a()
reader.send_broadcast(data)
reader.mute()
except Exception as e:
return e.__str__()
def conduct_apdu_exchange(reader: PN532, current: FullApduEntry) -> str | None:
"""Conducts an APDU exchange between the emulator and the PN 532 module.
Once the device is detected by the reader, the reader will send the APDU
commands to the device and receive the responses. If an error occurs -- for
instance, if the response from the emulator differs from the expected response
-- the error is logged in the output.
Args:
reader: The PN532 reader.
current: A data object containing the APDU commands to be sent to the
emulator and the expected responses.
Returns:
the error, if one occurs
"""
try:
for i in range(_NUM_RETRIES):
tag = reader.poll_a()
if tag is not None:
transacted = tag.transact(current.command, current.response)
if not transacted:
return "Received incorrect response. Expected: ".format(
current.response
)
return None
reader.mute()
return "No tag received"
except Exception as e:
return e.__str__()
def replay_transaction(log, module_path: str):
"""Replays the given transaction log on the PN 532 module.
Args:
log: The transaction log to be replayed.
module_path: The serial path to the PN 532 module.
"""
try:
reader = PN532(module_path)
except Exception as e:
print("Could not connect to PN532 module")
return
if not log:
return
prev_time = log[0].ts
for current in log:
num_seconds = (current.ts - prev_time) / 1000000
if num_seconds < _MIN_SLEEP_TIME_SECONDS:
time.sleep(_MIN_SLEEP_TIME_SECONDS)
elif num_seconds > _MAX_SLEEP_TIME_SECONDS:
time.sleep(_MAX_SLEEP_TIME_SECONDS)
else:
time.sleep(num_seconds)
error = None
if isinstance(current, PollingLoopEntry):
if current.type == NfcType.NFC_A:
error = send_nfc_a_data(reader)
elif current.type == NfcType.NFC_B:
error = send_nfc_b_data(reader)
elif current.type == NfcType.UNKNOWN:
error = send_unknown_data(reader, current.data)
elif isinstance(current, FullApduEntry):
error = conduct_apdu_exchange(reader, current)
output_line_for_snoop_log(current, error)
# adjust timestamp
prev_time = current.ts
reader.mute()
def parse_snoop_log(args: argparse.Namespace):
"""Parses the given snoop log file.
If the file will be used for replaying a transaction with the emulator app,
the AIDs will be replaced with the ones used by the app. Additionally, if the
user specifies a start and end time, the log will be filtered to only include
transactions that fall within that timeframe.
Args:
snoop_file: The local path to the snoop log file.
Returns:
The parsed snoop log.
"""
parsed = open_and_parse_file(args.file)
# replace the AIDs with the ones used by the emulator app
if args.replay_with_app or args.parse_only:
parsed = replace_aids(parsed)
return parse_timeframe(parsed, args.start, args.end)
def output_line_for_snoop_log(
entry: PollingLoopEntry | FullApduEntry,
error: str | None,
):
"""Outputs a summary of an interaction from a snoop log.
Args:
entry: The current interaction to be replayed from the snoop log.
error: Whether or not an error occurred during the replayed (actual)
transaction.
"""
cur_time = int(float(datetime.datetime.now().timestamp() * 1000000))
cur_time_str = datetime.datetime.fromtimestamp(cur_time / 1000000).strftime(
"%Y-%m-%d %H:%M:%S.%f"
)
if isinstance(entry, FullApduEntry):
print(
_APDU_OUTPUT_STR.format(
cur_time_str,
[command.hex() for command in entry.command],
[response.hex() for response in entry.response],
)
)
else: # isinstance(entry, PollingLoopEntry)
print(_POLLING_OUTPUT_STR.format(cur_time_str, entry.type.name))
if error is not None:
print(_ERROR_STR.format(error))
def print_opening_sequence(
file_name: str,
start: str | None = None,
end: str | None = None,
):
"""Prints the opening sequence for a test case or snoop log.
The name of the file to be replayed is displayed, along with the timeframe
that will be replayed, if specified by the user.
Args:
file_name: The name of the file to be replayed.
start: The start of the timeframe to be replayed.
end: The end of the timeframe to be replayed.
"""
print()
print("Replaying transaction from snoop log: {}".format(file_name))
if start is not None and end is not None:
print("Timeframe: {} - {}".format(start, end))
elif start is not None:
print("Timeframe: {} - end".format(start))
elif end is not None:
print("Timeframe: start - {}".format(end))
else:
print()
def create_file_for_emulator_app(
output: list[PollingLoopEntry | FullApduEntry], filename: str
):
"""Creates a file containing the parsed APDU exchanges from a snoop log.
This will be to replay the transaction with an Android app installed on the
emulator.
Args:
output: A list of polling loop entries and APDU exchanges parsed from the
snoop log.
filename: The name of the file to be created. This is near-identical to the
name of the snoop log file.
"""
local_path = _EMULATOR_APP_PARSED_DIR + filename.replace("/", "_")
full_path = os.path.dirname(os.path.realpath(__file__)) + "/" + local_path
try:
file = open(full_path, "wt")
except Exception as e:
raise RuntimeError(
"Error occurred while opening file: {}".format(full_path)
) from e
for entry in output:
if isinstance(entry, FullApduEntry):
file.write(
"{};{}".format(
[command.hex() for command in entry.command],
[response.hex() for response in entry.response],
)
)
file.write("\n")
print()
print("File for third party app generated at: {}".format(local_path))
def get_name_for_test_case(filename: str) -> str:
return "Generated" + filename.replace("/", "").replace(".txt", "")
def main():
parser = argparse.ArgumentParser(prog="pn532")
parser.add_argument(
"-p",
"--path",
action="store",
help="Path to the PN532 serial device, e.g. /dev/ttyUSB0",
)
parser.add_argument(
"-f",
"--file",
action="store",
required=True,
help="Path to the file of the snoop log",
)
parser.add_argument(
"--start",
action="store",
help="Start of the timeframe to be replayed",
)
parser.add_argument(
"--end",
action="store",
help="End of the timeframe to be replayed",
)
parser.add_argument(
"--parse_only",
action="store_true",
help="Parse the log without replaying the transaction",
)
parser.add_argument(
"--replay_with_app",
action="store_true",
help="Replay the transaction with the emulator app",
)
parser.add_argument(
"--generate_and_replay_test",
action="store_true",
help="Generate a test case from the log and then immediately run it",
)
args = parser.parse_args()
parsed_snoop_log = parse_snoop_log(args)
if args.parse_only: # scenario 1: parse snoop log for the emulator app
create_file_for_emulator_app(parsed_snoop_log, args.file)
else: # scenario 2: replay transaction from a snoop log
print_opening_sequence(
file_name=args.file,
start=args.start,
end=args.end,
)
if args.generate_and_replay_test: # Replay the test that was just generated
test_case_name = get_name_for_test_case(args.file)
apdu_local_file = generate_test(parsed_snoop_log, test_case_name)
test_command = [
"atest",
"-v",
test_case_name,
"--",
"--testparam",
"pn532_serial_path=" + args.path,
"--testparam",
"file_path=" + apdu_local_file,
]
if args.replay_with_app:
test_command += ["--testparam", "with_emulator_app=True"]
subprocess.run(test_command)
else: # Default: replay the transaction
replay_transaction(parsed_snoop_log, args.path)
if __name__ == "__main__":
main()