| # Copyright (C) 2024 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # Lint as: python3 |
| |
| """Generates a Python test case from a snoop log.""" |
| |
| import json |
| import math |
| import os |
| |
| from parse_log import FullApduEntry, NfcType, PollingLoopEntry |
| |
| INDENT_SIZE = 4 |
| |
| |
| def generate_test( |
| log: list[FullApduEntry | PollingLoopEntry], name: str |
| ) -> str: |
| """Generates a Python test case from a snoop log parsed by the replay tool. |
| |
| The generated test will be placed in the current directory. |
| |
| Args: |
| log: The parsed snoop log. |
| name: The name of the file containing the snoop log. |
| |
| Returns: |
| The name of the JSON file containing APDUs needed to run the test. |
| """ |
| # The name of the test file is based on the name of the snoop log |
| python_local_file = name + "_test.py" |
| file_path = ( |
| os.path.dirname(os.path.realpath(__file__)) + "/" + python_local_file |
| ) |
| |
| try: |
| file = open(file_path, "wt") |
| except Exception as e: |
| raise RuntimeError( |
| "Error occurred while opening file: {}".format(file_path) |
| ) from e |
| file.write(create_imports()) |
| file.write(create_polling_loop_methods()) |
| file.write(create_apdu_exchange_method()) |
| file.write(create_setup()) |
| file.write(create_test_opening(name)) |
| |
| last_timestamp = log[0].ts |
| json_list = [] |
| for entry in log: |
| if isinstance(entry, PollingLoopEntry): |
| file.write(create_polling_loop_test(entry, last_timestamp)) |
| else: # isinstance(entry, FullApduEntry): |
| file.write(create_apdu_test(entry, last_timestamp)) |
| json_list.append(create_apdu_dict(entry)) |
| last_timestamp = entry.ts |
| |
| json_dump = json.dumps(json_list) |
| apdu_local_file = name + "_apdus.json" |
| apdu_file_path = ( |
| os.path.dirname(os.path.realpath(__file__)) + "/" + apdu_local_file |
| ) |
| apdu_file = open(apdu_file_path, "wt") |
| apdu_file.write(json_dump) |
| |
| file.write(create_teardown_test()) |
| file.write(create_main_function()) |
| |
| print() |
| print( |
| "Test generated at {}. To run the test, copy the test file to" |
| " packages/modules/Nfc/NfcNci/tests/testcases/multidevices/.".format(file_path) |
| ) |
| update_android_bp(python_local_file, name) |
| |
| return apdu_local_file |
| |
| |
| def update_android_bp(local_file_path, test_name): |
| """Creates a new python_test_host entry in Android.bp for the generated test.""" |
| try: |
| android_bp = open("Android.bp", "a") |
| except Exception as e: |
| raise RuntimeError("Error occurred while opening Android.bp") from e |
| |
| s = create_line() |
| s += create_line() |
| s += create_line("python_test_host {") |
| s += create_line('name: "{}",'.format(test_name), indent=1) |
| s += create_line('main: "{}",'.format(local_file_path), indent=1) |
| s += create_line('srcs: ["{}"],'.format(local_file_path), indent=1) |
| s += create_line('test_config: "AndroidTest.xml",', indent=1) |
| s += create_line('device_common_data: [', indent=1) |
| s += create_line('":NfcEmulatorApduApp",', indent=2) |
| s += create_line('"config.yaml",', indent=2) |
| s += create_line('],', indent=1) |
| s += create_line("test_options: {", indent=1) |
| s += create_line("unit_test: false,", indent=2) |
| s += create_line('runner: "mobly",', indent=2) |
| s += create_line("},", indent=1) |
| s += create_line('defaults: ["GeneratedTestsPythonDefaults"],', indent=1) |
| s += create_line("}") |
| android_bp.write(s) |
| |
| |
| def create_apdu_dict(entry: FullApduEntry): |
| """Creates a dictionary representation of an APDU entry.""" |
| command_arr = [] |
| for cmd in entry.command: |
| command_arr.append(cmd.hex()) |
| response_arr = [] |
| for rsp in entry.response: |
| if isinstance(rsp, str): |
| response_arr.append(rsp) |
| else: |
| response_arr.append(rsp.hex()) |
| apdu_dict = { |
| "commands": command_arr, |
| "responses": response_arr, |
| } |
| return apdu_dict |
| |
| |
| def create_test_opening(name: str): |
| """Creates the opening of the test file.""" |
| s = create_line("def test_{}(self):".format(name), indent=1) |
| s += create_line("# Read in APDU commands and responses from file", indent=2) |
| s += create_line( |
| 'file_path_name = self.user_params.get("file_path", "")', indent=2 |
| ) |
| s += create_line("apdu_cmds = []", indent=2) |
| s += create_line("apdu_rsps = []", indent=2) |
| s += create_line("if file_path_name:", indent=2) |
| s += create_line('with open(file_path_name, "r") as json_str:', indent=3) |
| s += create_line('self.emulator.nfc_emulator.startMainActivity(json_str.read())', indent=4) |
| s += create_line() |
| s += create_line('with open(file_path_name, "r") as json_data:', indent=3) |
| s += create_line("d = json.load(json_data)", indent=4) |
| s += create_line("for entry in d:", indent=4) |
| s += create_line("apdu_cmds.append(", indent=5) |
| s += create_line( |
| '[bytearray.fromhex(cmd) for cmd in entry["commands"]]', indent=6 |
| ) |
| s += create_line(")", indent=5) |
| s += create_line("apdu_rsps.append(", indent=5) |
| s += create_line( |
| '[bytearray.fromhex(rsp) for rsp in entry["responses"]]', indent=6 |
| ) |
| s += create_line(")", indent=5) |
| s += create_line() |
| return s |
| |
| |
| def create_polling_loop_test(entry: PollingLoopEntry, last_timestamp: int): |
| """Adds code to send a polling loop from the reader to the emulator. |
| |
| The test will check to ensure that the polling loop is successfully received. |
| """ |
| s = create_line( |
| "# Sending {} polling loop".format(entry.type), |
| indent=2, |
| ) |
| |
| sleep_time = calculate_time_to_sleep(entry.ts, last_timestamp) |
| s += create_line("time.sleep({})".format(sleep_time), indent=2) |
| |
| match entry.type: |
| case NfcType.NFC_A: |
| s += create_line("saw_loop = send_polling_loop_a(self.reader)", indent=2) |
| case NfcType.NFC_B: |
| s += create_line("saw_loop = send_polling_loop_b(self.reader)", indent=2) |
| case _: # NfcType.UNKNOWN |
| s += create_line('custom_data = "{}"'.format(entry.data.hex()), indent=2) |
| s += create_line( |
| "saw_loop = send_custom_polling_loop(self.reader, custom_data)", |
| indent=2, |
| ) |
| s += create_line( |
| 'asserts.assert_true(saw_loop, "Did not see polling loop")', indent=2 |
| ) |
| s += create_line() |
| return s |
| |
| |
| def create_apdu_test(entry: FullApduEntry, last_timestamp: int): |
| """Adds code to conduct an APDU exchange between the reader and emulator. |
| |
| The test will check to ensure that the expected response is received from the |
| emulator. |
| """ |
| s = create_line("# Conducting APDU exchange", indent=2) |
| |
| sleep_time = calculate_time_to_sleep(entry.ts, last_timestamp) |
| s += create_line("time.sleep({})".format(sleep_time), indent=2) |
| |
| s += create_line("commands = apdu_cmds[0]", indent=2) |
| s += create_line("responses = apdu_rsps[0]", indent=2) |
| s += create_line( |
| "tag_found, transacted = conduct_apdu_exchange(self.reader, commands," |
| " responses)", |
| indent=2, |
| ) |
| s += create_line() |
| s += create_line("asserts.assert_true(", indent=2) |
| s += create_line( |
| 'tag_found, "Reader did not detect tag, transaction not attempted."', |
| indent=3, |
| ) |
| s += create_line(")", indent=2) |
| s += create_line("asserts.assert_true(", indent=2) |
| s += create_line("transacted,", indent=3) |
| s += create_line( |
| '"Transaction failed, check device logs for more information."', indent=3 |
| ) |
| s += create_line(")", indent=2) |
| s += create_line() |
| s += create_line("apdu_cmds.pop(0)", indent=2) |
| s += create_line("apdu_rsps.pop(0)", indent=2) |
| s += create_line() |
| return s |
| |
| |
| def create_imports(): |
| s = create_line('"""Test generated from the NFC Replay Tool."""') |
| s += create_line() |
| s += create_line("import json") |
| s += create_line("import time") |
| s += create_line("from mobly import asserts") |
| s += create_line("from mobly import base_test") |
| s += create_line("from mobly import test_runner") |
| s += create_line("from mobly.controllers import android_device") |
| s += create_line("import pn532") |
| s += create_line() |
| s += create_line("# Number of polling loops to perform.") |
| s += create_line("_NUM_POLLING_LOOPS = 50") |
| s += create_line() |
| return s |
| |
| def create_polling_loop_methods(): |
| """Create methods that send polling loops to the reader. |
| |
| Specifically, three methods are created: send_polling_loop_a(), |
| send_polling_loop_b(), and send_custom_polling_loop(). |
| """ |
| s = create_line() |
| s += create_line() |
| s += create_line("def send_polling_loop_a(reader: pn532.PN532) -> bool:") |
| s += create_line("saw_loop = False", indent=1) |
| s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1) |
| s += create_line("tag = reader.poll_a()", indent=2) |
| s += create_line("if tag is not None:", indent=2) |
| s += create_line("saw_loop = True", indent=3) |
| s += create_line("break", indent=3) |
| s += create_line("reader.mute()", indent=2) |
| s += create_line("return saw_loop", indent=1) |
| s += create_line() |
| s += create_line() |
| s += create_line("def send_polling_loop_b(reader: pn532.PN532) -> bool:") |
| s += create_line("saw_loop = False", indent=1) |
| s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1) |
| s += create_line("tag = reader.poll_b()", indent=2) |
| s += create_line("if tag is not None:", indent=2) |
| s += create_line("saw_loop = True", indent=3) |
| s += create_line("break", indent=3) |
| s += create_line("reader.mute()", indent=2) |
| s += create_line("return saw_loop", indent=1) |
| s += create_line() |
| s += create_line() |
| s += create_line( |
| "def send_custom_polling_loop(reader: pn532.PN532, custom_data_hex: str)" |
| " -> bool:" |
| ) |
| s += create_line("saw_loop = False", indent=1) |
| s += create_line("for i in range(_NUM_POLLING_LOOPS):", indent=1) |
| s += create_line("tag = reader.poll_a()", indent=2) |
| s += create_line("if tag is not None:", indent=2) |
| s += create_line( |
| "reader.send_broadcast(bytearray.fromhex(custom_data_hex))", indent=3 |
| ) |
| s += create_line("saw_loop = True", indent=3) |
| s += create_line("break", indent=3) |
| s += create_line("reader.poll_b()", indent=2) |
| s += create_line("reader.mute()", indent=2) |
| s += create_line("return saw_loop", indent=1) |
| return s |
| |
| |
| def create_apdu_exchange_method(): |
| """Creates method to conduct an APDU exchange between the emulator and reader.""" |
| s = create_line() |
| s += create_line() |
| s += create_line("def conduct_apdu_exchange(") |
| s += create_line( |
| "reader: pn532.PN532, commands: list[bytearray], responses:" |
| " list[bytearray]", |
| indent=2, |
| ) |
| s += create_line(") -> tuple[pn532.tag.TypeATag | None, bool]:") |
| s += create_line( |
| '"""Conducts an APDU exchange with the PN532 reader."""', indent=1 |
| ) |
| s += create_line('for _ in range(_NUM_POLLING_LOOPS):', indent=1) |
| s += create_line('tag = reader.poll_a()', indent=2) |
| s += create_line('if tag is not None:', indent=2) |
| s += create_line('transacted = tag.transact(commands, responses)', indent=3) |
| s += create_line('reader.mute()', indent=3) |
| s += create_line('# edge case: expect no response', indent=3) |
| s += create_line('if not responses or responses[0] == bytearray.fromhex(""):', indent=3) |
| s += create_line('return tag, True', indent=4) |
| s += create_line('return tag, transacted', indent=3) |
| s += create_line('reader.mute()', indent=2) |
| s += create_line('return None, False', indent=1) |
| return s |
| |
| |
| def create_setup(): |
| """Creates methods to prepare the PN532 reader and emulator before the test. |
| |
| This involves checking to ensure that the raeder and emulator are both |
| present, and enabling NFC on the emulator. |
| |
| Args: |
| name: The name of the original snoop log file. |
| """ |
| s = create_line() |
| s += create_line() |
| s += create_line( |
| "class GeneratedMultiDeviceTestCases(base_test.BaseTestClass):" |
| ) |
| s += create_line() |
| s += create_line("def setup_class(self):", indent=1) |
| s += create_line( |
| "self.emulator = self.register_controller(android_device)[0]", indent=2 |
| ) |
| s += create_line('self.emulator.debug_tag = "emulator"', indent=2) |
| s += create_line('if (hasattr(self.emulator, "dimensions") and "pn532_serial_path" in self.emulator.dimensions):', indent=2) |
| s += create_line('pn532_serial_path = self.emulator.dimensions["pn532_serial_path"]', indent=3) |
| s += create_line('else:', indent=2) |
| s += create_line( |
| 'pn532_serial_path = self.user_params.get("pn532_serial_path", "")', |
| indent=3, |
| ) |
| s += create_line('self.emulator.load_snippet("nfc_emulator", "com.android.nfc.emulatorapp")', indent=2) |
| s += create_line( |
| 'self.emulator.adb.shell(["svc", "nfc", "disable"])', indent=2 |
| ) |
| s += create_line( |
| 'self.emulator.adb.shell(["svc", "nfc", "enable"])', indent=2 |
| ) |
| s += create_line("self.reader = pn532.PN532(pn532_serial_path)", indent=2) |
| s += create_line("self.reader.mute()", indent=2) |
| s += create_line() |
| return s |
| |
| |
| def create_teardown_test(): |
| s = create_line("def teardown_test(self):", indent=1) |
| s += create_line("self.reader.mute()", indent=2) |
| return s |
| |
| |
| def create_main_function(): |
| s = create_line() |
| s += create_line('if __name__ == "__main__":') |
| s += create_line("test_runner.main()", indent=1) |
| s += create_line() |
| return s |
| |
| |
| def create_line(s: str = "", indent: int = 0): |
| return "{}{}\n".format(create_indent(indent), s) |
| |
| |
| def create_indent(multiplier: int): |
| return " " * multiplier * INDENT_SIZE |
| |
| |
| def calculate_time_to_sleep(current_ts: int, last_ts: int) -> int: |
| num_seconds = math.ceil((current_ts - last_ts) / 1000000) |
| if num_seconds < 1: |
| return 1 |
| return num_seconds |