| # Copyright (C) 2024 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # Lint as: python3 |
| |
| import argparse |
| import datetime |
| import os |
| import subprocess |
| import time |
| from generate_test import generate_test |
| from parse_log import FullApduEntry, NfcType, PollingLoopEntry, open_and_parse_file, parse_timeframe, replace_aids |
| from pn532 import PN532 |
| |
| # Minimum amount of time between successive NFC transactions, to prevent the |
| # reader from being overloaded. |
| _MIN_SLEEP_TIME_SECONDS = 0.5 |
| |
| # Maximum amount of time allowed between successive NFC transactions, to allow |
| # for ease of use. |
| _MAX_SLEEP_TIME_SECONDS = 5 |
| |
| _EXPECTED_ERROR_INDEX = 0 |
| _ACTUAL_ERROR_INDEX = 1 |
| |
| # Number of times to try to obtain a tag before declaring failure. |
| _NUM_RETRIES = 50 |
| |
| # String templates for the output of a test case or snoop log. |
| _APDU_OUTPUT_STR = "{}: APDU exchange: sent {}, received {}" |
| _APDU_OUTPUT_TEST_STR = "{}: APDU exchange: sent {} APDUs, received {} APDUs" |
| _POLLING_OUTPUT_STR = "{}: sent NFC data of type {}" |
| _ERROR_STR = " ERROR: {}" |
| |
| # Width of the column for outputting the results of a test case. |
| _COLUMN_WIDTH = 80 |
| |
| # Directory for generated test cases and files for the emulator app. |
| _EMULATOR_APP_PARSED_DIR = "src/com/android/nfc/emulatorapp/parsed_files/" |
| |
| |
| def send_nfc_a_data(reader: PN532) -> str | None: |
| """Calls poll_a() on the reader. |
| |
| Args: |
| reader: The PN532 reader. |
| |
| Returns: |
| the error, if one occurs |
| """ |
| try: |
| reader.poll_a() |
| except Exception as e: |
| return e.__str__() |
| |
| |
| def send_nfc_b_data(reader: PN532) -> str | None: |
| """Calls poll_b() on the reader. |
| |
| Args: |
| reader: The PN532 reader. |
| |
| Returns: |
| the error, if one occurs |
| """ |
| try: |
| reader.poll_b() |
| except Exception as e: |
| return e.__str__() |
| |
| |
| def send_unknown_data(reader: PN532, data: bytes) -> str | None: |
| """Sends a custom polling frame to the reader. |
| |
| Args: |
| reader: The PN532 reader. |
| data: The custom polling frame to be sent to the reader. |
| |
| Returns: |
| the error, if one occurs |
| """ |
| try: |
| reader.poll_a() |
| reader.send_broadcast(data) |
| reader.mute() |
| except Exception as e: |
| return e.__str__() |
| |
| |
| def conduct_apdu_exchange(reader: PN532, current: FullApduEntry) -> str | None: |
| """Conducts an APDU exchange between the emulator and the PN 532 module. |
| |
| Once the device is detected by the reader, the reader will send the APDU |
| commands to the device and receive the responses. If an error occurs -- for |
| instance, if the response from the emulator differs from the expected response |
| -- the error is logged in the output. |
| |
| Args: |
| reader: The PN532 reader. |
| current: A data object containing the APDU commands to be sent to the |
| emulator and the expected responses. |
| |
| Returns: |
| the error, if one occurs |
| """ |
| try: |
| for i in range(_NUM_RETRIES): |
| tag = reader.poll_a() |
| if tag is not None: |
| transacted = tag.transact(current.command, current.response) |
| |
| if not transacted: |
| return "Received incorrect response. Expected: ".format( |
| current.response |
| ) |
| return None |
| reader.mute() |
| return "No tag received" |
| except Exception as e: |
| return e.__str__() |
| |
| |
| def replay_transaction(log, module_path: str): |
| """Replays the given transaction log on the PN 532 module. |
| |
| Args: |
| log: The transaction log to be replayed. |
| module_path: The serial path to the PN 532 module. |
| """ |
| try: |
| reader = PN532(module_path) |
| except Exception as e: |
| print("Could not connect to PN532 module") |
| return |
| |
| if not log: |
| return |
| |
| prev_time = log[0].ts |
| |
| for current in log: |
| num_seconds = (current.ts - prev_time) / 1000000 |
| if num_seconds < _MIN_SLEEP_TIME_SECONDS: |
| time.sleep(_MIN_SLEEP_TIME_SECONDS) |
| elif num_seconds > _MAX_SLEEP_TIME_SECONDS: |
| time.sleep(_MAX_SLEEP_TIME_SECONDS) |
| else: |
| time.sleep(num_seconds) |
| |
| error = None |
| |
| if isinstance(current, PollingLoopEntry): |
| if current.type == NfcType.NFC_A: |
| error = send_nfc_a_data(reader) |
| elif current.type == NfcType.NFC_B: |
| error = send_nfc_b_data(reader) |
| elif current.type == NfcType.UNKNOWN: |
| error = send_unknown_data(reader, current.data) |
| elif isinstance(current, FullApduEntry): |
| error = conduct_apdu_exchange(reader, current) |
| |
| output_line_for_snoop_log(current, error) |
| |
| # adjust timestamp |
| prev_time = current.ts |
| |
| reader.mute() |
| |
| |
| def parse_snoop_log(args: argparse.Namespace): |
| """Parses the given snoop log file. |
| |
| If the file will be used for replaying a transaction with the emulator app, |
| the AIDs will be replaced with the ones used by the app. Additionally, if the |
| user specifies a start and end time, the log will be filtered to only include |
| transactions that fall within that timeframe. |
| |
| Args: |
| snoop_file: The local path to the snoop log file. |
| |
| Returns: |
| The parsed snoop log. |
| """ |
| parsed = open_and_parse_file(args.file) |
| |
| # replace the AIDs with the ones used by the emulator app |
| if args.replay_with_app or args.parse_only: |
| parsed = replace_aids(parsed) |
| |
| return parse_timeframe(parsed, args.start, args.end) |
| |
| |
| def output_line_for_snoop_log( |
| entry: PollingLoopEntry | FullApduEntry, |
| error: str | None, |
| ): |
| """Outputs a summary of an interaction from a snoop log. |
| |
| Args: |
| entry: The current interaction to be replayed from the snoop log. |
| error: Whether or not an error occurred during the replayed (actual) |
| transaction. |
| """ |
| cur_time = int(float(datetime.datetime.now().timestamp() * 1000000)) |
| cur_time_str = datetime.datetime.fromtimestamp(cur_time / 1000000).strftime( |
| "%Y-%m-%d %H:%M:%S.%f" |
| ) |
| if isinstance(entry, FullApduEntry): |
| print( |
| _APDU_OUTPUT_STR.format( |
| cur_time_str, |
| [command.hex() for command in entry.command], |
| [response.hex() for response in entry.response], |
| ) |
| ) |
| else: # isinstance(entry, PollingLoopEntry) |
| print(_POLLING_OUTPUT_STR.format(cur_time_str, entry.type.name)) |
| |
| if error is not None: |
| print(_ERROR_STR.format(error)) |
| |
| |
| def print_opening_sequence( |
| file_name: str, |
| start: str | None = None, |
| end: str | None = None, |
| ): |
| """Prints the opening sequence for a test case or snoop log. |
| |
| The name of the file to be replayed is displayed, along with the timeframe |
| that will be replayed, if specified by the user. |
| |
| Args: |
| file_name: The name of the file to be replayed. |
| start: The start of the timeframe to be replayed. |
| end: The end of the timeframe to be replayed. |
| """ |
| print() |
| print("Replaying transaction from snoop log: {}".format(file_name)) |
| if start is not None and end is not None: |
| print("Timeframe: {} - {}".format(start, end)) |
| elif start is not None: |
| print("Timeframe: {} - end".format(start)) |
| elif end is not None: |
| print("Timeframe: start - {}".format(end)) |
| else: |
| print() |
| |
| |
| def create_file_for_emulator_app( |
| output: list[PollingLoopEntry | FullApduEntry], filename: str |
| ): |
| """Creates a file containing the parsed APDU exchanges from a snoop log. |
| |
| This will be to replay the transaction with an Android app installed on the |
| emulator. |
| |
| Args: |
| output: A list of polling loop entries and APDU exchanges parsed from the |
| snoop log. |
| filename: The name of the file to be created. This is near-identical to the |
| name of the snoop log file. |
| """ |
| local_path = _EMULATOR_APP_PARSED_DIR + filename.replace("/", "_") |
| full_path = os.path.dirname(os.path.realpath(__file__)) + "/" + local_path |
| try: |
| file = open(full_path, "wt") |
| except Exception as e: |
| raise RuntimeError( |
| "Error occurred while opening file: {}".format(full_path) |
| ) from e |
| for entry in output: |
| if isinstance(entry, FullApduEntry): |
| file.write( |
| "{};{}".format( |
| [command.hex() for command in entry.command], |
| [response.hex() for response in entry.response], |
| ) |
| ) |
| file.write("\n") |
| print() |
| print("File for third party app generated at: {}".format(local_path)) |
| |
| |
| def get_name_for_test_case(filename: str) -> str: |
| return "Generated" + filename.replace("/", "").replace(".txt", "") |
| |
| |
| def main(): |
| parser = argparse.ArgumentParser(prog="pn532") |
| parser.add_argument( |
| "-p", |
| "--path", |
| action="store", |
| help="Path to the PN532 serial device, e.g. /dev/ttyUSB0", |
| ) |
| parser.add_argument( |
| "-f", |
| "--file", |
| action="store", |
| required=True, |
| help="Path to the file of the snoop log", |
| ) |
| parser.add_argument( |
| "--start", |
| action="store", |
| help="Start of the timeframe to be replayed", |
| ) |
| parser.add_argument( |
| "--end", |
| action="store", |
| help="End of the timeframe to be replayed", |
| ) |
| parser.add_argument( |
| "--parse_only", |
| action="store_true", |
| help="Parse the log without replaying the transaction", |
| ) |
| parser.add_argument( |
| "--replay_with_app", |
| action="store_true", |
| help="Replay the transaction with the emulator app", |
| ) |
| parser.add_argument( |
| "--generate_and_replay_test", |
| action="store_true", |
| help="Generate a test case from the log and then immediately run it", |
| ) |
| args = parser.parse_args() |
| |
| parsed_snoop_log = parse_snoop_log(args) |
| if args.parse_only: # scenario 1: parse snoop log for the emulator app |
| create_file_for_emulator_app(parsed_snoop_log, args.file) |
| else: # scenario 2: replay transaction from a snoop log |
| print_opening_sequence( |
| file_name=args.file, |
| start=args.start, |
| end=args.end, |
| ) |
| if args.generate_and_replay_test: # Replay the test that was just generated |
| test_case_name = get_name_for_test_case(args.file) |
| apdu_local_file = generate_test(parsed_snoop_log, test_case_name) |
| test_command = [ |
| "atest", |
| "-v", |
| test_case_name, |
| "--", |
| "--testparam", |
| "pn532_serial_path=" + args.path, |
| "--testparam", |
| "file_path=" + apdu_local_file, |
| ] |
| if args.replay_with_app: |
| test_command += ["--testparam", "with_emulator_app=True"] |
| subprocess.run(test_command) |
| else: # Default: replay the transaction |
| replay_transaction(parsed_snoop_log, args.path) |
| |
| |
| if __name__ == "__main__": |
| main() |