| # Copyright (C) 2024 The Android Open Source Project |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # Lint as: python3 |
| |
| """Parses the snoop log to extract polling loop data and APDU exchanges.""" |
| |
| import base64 |
| import dataclasses |
| import datetime |
| import enum |
| import os |
| import zlib |
| |
| PREAMBLE_LENGTH = 9 |
| HEADER_LENGTH = 7 |
| SNOOP_LOG_START = "BEGIN:NFCSNOOP_" |
| SNOOP_LOG_END = "END:NFCSNOOP_" |
| |
| # Bytes identifying the starts of polling loop and APDU transactions |
| POLLING_LOOP_START_BYTES = bytes.fromhex("6f0c") |
| APDU_START_BYTES = bytes.fromhex("6f02") |
| |
| # Index corresponding to the total length of the APDU data packet |
| APDU_LEN_INDEX = 2 |
| |
| # Size of the main APDU header, which precedes either a list of APDU exchanges |
| # or a single APDU exchange |
| APDU_MAIN_HEADER_SIZE = 5 |
| |
| # Start of the APDU data, which follows the main header |
| APDU_DATA_START_INDEX = 6 |
| |
| # Minimum lengths of a valid APDU command and response |
| APDU_COMMAND_MIN_LENGTH = 13 |
| APDU_RESPONSE_MIN_LENGTH = 10 |
| |
| # Sequence of bytes that identifies an APDU transaction |
| APDU_IDENTIFIER = bytes([0x20, 0x00]) |
| |
| # Byte that identifies an APDU command |
| APDU_COMMAND_IDENTIFIER = 0x19 |
| |
| # Byte that identifies an APDU response |
| APDU_RESPONSE_IDENTIFIER = 0x08 |
| |
| # Sequence of bytes that identifies whether the APDU command or response was |
| # the first command or response in a longer list of APDU commands or responses |
| APDU_ORDER_FIRST = 0x02 |
| APDU_ORDER_FIRST_ALT = bytes([0x0A, 0x00]) |
| APDU_ORDER_SECOND = 0x03 |
| APDU_ORDER_SECOND_ALT = bytes([0x0B, 0x00]) |
| |
| # Sequence of bytes that identifies the start of a "SELECT AID" APDU command |
| AID_START_BYTES = bytes.fromhex("00A40400") |
| |
| # AID groups that are used by the emulator app |
| SELECT_AID_FIRST = bytes.fromhex("00A4040008A000000151000000") |
| SELECT_AID_SECOND = bytes.fromhex("00A4040008A000000003000000") |
| |
| |
| class NfcType(enum.Enum): |
| NFC_A = 0 |
| NFC_B = 1 |
| NFC_F = 2 |
| NFC_V = 3 |
| REMOTE_FIELD = 4 |
| UNKNOWN = 5 |
| |
| |
| @dataclasses.dataclass |
| class PollingLoopEntry: |
| ts: int = 0 |
| type: NfcType = NfcType.UNKNOWN |
| data: bytes = b"" |
| error: str | None = None |
| |
| |
| @dataclasses.dataclass |
| class PartialApduEntry: |
| ts: int = 0 |
| is_command: bool = False |
| data: bytes = b"" # data sent by the APDU command or response |
| is_first: bool = ( |
| True |
| # whether this is the first entry in the list of APDU commands or responses sent together |
| ) |
| |
| |
| @dataclasses.dataclass |
| class FullApduEntry: |
| ts: int = 0 |
| command: list[bytes] = dataclasses.field(default_factory=lambda: []) |
| response: list[bytes] = dataclasses.field(default_factory=lambda: []) |
| error: str | None = None |
| |
| |
| def replace_aids( |
| log: list[PollingLoopEntry | FullApduEntry], |
| ) -> list[PollingLoopEntry | FullApduEntry]: |
| """Replaces the AIDs in the log with the AIDs that are used by the emulator app.""" |
| new_log: list[PollingLoopEntry | FullApduEntry] = [] |
| for cur in log: |
| is_first_aid = True |
| if isinstance(cur, FullApduEntry): |
| new_apdu_entry = FullApduEntry( |
| ts=cur.ts, command=[], response=cur.response |
| ) |
| for cmd in cur.command: |
| if cmd.startswith(AID_START_BYTES): |
| if is_first_aid: |
| new_apdu_entry.command.append(SELECT_AID_FIRST) |
| is_first_aid = False |
| else: |
| new_apdu_entry.command.append(SELECT_AID_SECOND) |
| else: |
| new_apdu_entry.command.append(cmd) |
| new_log.append(new_apdu_entry) |
| else: |
| new_log.append(cur) |
| return new_log |
| |
| |
| def parse_timeframe(log, start, end): |
| """Returns a subset of the log that falls within the given timeframe.""" |
| if start is None and end is None: |
| return log |
| parsed_log = log |
| if start is not None: |
| start_dt = datetime.datetime.strptime(start, "%Y-%m-%d %H:%M:%S.%f") |
| start_ts = int(float(datetime.datetime.timestamp(start_dt)) * 1000000) |
| parsed_log = list(filter(lambda x: x.ts >= start_ts, log)) |
| if end is not None: |
| end_dt = datetime.datetime.strptime(end, "%Y-%m-%d %H:%M:%S.%f") |
| end_ts = int(float(datetime.datetime.timestamp(end_dt)) * 1000000) |
| parsed_log = list(filter(lambda x: x.ts <= end_ts, parsed_log)) |
| return parsed_log |
| |
| |
| def standardize_log( |
| log: list[PollingLoopEntry | PartialApduEntry], |
| ) -> list[PollingLoopEntry | FullApduEntry]: |
| """Standardizes the log to ensure that it can be replayed by the PN 532 module. |
| |
| This includes removing redundant calls to polling loop A and combining APDU |
| commands and responses into a single entry. |
| |
| Args: |
| log: The log to be standardized. |
| |
| Returns: |
| The standardized log. |
| """ |
| cmds = [] |
| rsps = [] |
| last_ts = 0 |
| standardized: list[PollingLoopEntry | FullApduEntry] = [] |
| for cur in log: |
| if isinstance(cur, PollingLoopEntry): |
| if cur.type == NfcType.NFC_A or cur.type == NfcType.NFC_B: |
| standardized.append(cur) |
| elif cur.type == NfcType.UNKNOWN: |
| if not standardized: |
| standardized.append(cur) |
| else: |
| standardized[-1] = cur |
| elif isinstance(cur, PartialApduEntry): |
| if cur.is_command: |
| if len(cmds) == len(rsps) + 1: # extra command without response |
| rsps.append(b"") |
| if len(cmds) == len(rsps) != 0 and cur.data.startswith(AID_START_BYTES): |
| standardized.append(FullApduEntry(last_ts, cmds, rsps)) |
| cmds = [] |
| rsps = [] |
| cmds.append(cur.data) |
| else: |
| if len(cmds) == len(rsps): # extra response without command |
| continue |
| rsps.append(cur.data) |
| last_ts = cur.ts |
| # handle last command and response |
| if len(cmds) == len(rsps) + 1: |
| rsps.append(b"") |
| if len(cmds) == len(rsps) != 0: |
| standardized.append(FullApduEntry(last_ts, cmds, rsps)) |
| return standardized |
| |
| |
| def parse_file(data: bytes) -> list[PollingLoopEntry | PartialApduEntry]: |
| """Parses the file to extract polling loop data and APDU exchanges.""" |
| if not data: |
| raise RuntimeError("No data found in file") |
| version = data[0] |
| if version != 1: |
| raise RuntimeError("Unsupported version: {}".format(version)) |
| |
| offset = PREAMBLE_LENGTH |
| header_length = HEADER_LENGTH |
| pts_offset = 2 |
| polling_list = [] |
| ts = calculate_timestamp(data) |
| while len(data) - offset > header_length: |
| |
| # length of the current transaction in bytes |
| length = data[offset] + (data[offset + 1] << 8) |
| |
| # duration between the last transaction and the current one |
| pts = bytearray(data[offset + pts_offset : offset + pts_offset + 4]) |
| pts_real = pts[0] + (pts[1] << 8) + (pts[2] << 16) + (pts[3] << 24) |
| ts += pts_real |
| |
| transaction_type = (data[offset + header_length] & 0xE0) >> 5 |
| if transaction_type == 3: # ST_NTF or NCI_NTF transactions |
| cur_data = data[offset + header_length : offset + header_length + length] |
| if cur_data.startswith(POLLING_LOOP_START_BYTES): |
| polling_list.extend(add_polling_data(cur_data, ts)) |
| elif cur_data.startswith(APDU_START_BYTES): |
| apdu_transactions = find_apdu_transactions(cur_data, ts) |
| polling_list.extend(apdu_transactions) |
| offset += header_length + length |
| return polling_list |
| |
| |
| def open_and_parse_file( |
| file_path: str, |
| ) -> list[PollingLoopEntry | FullApduEntry]: |
| """Opens the file that contains the unparsed snoop log and parses it. |
| |
| Args: |
| file_path: The path of the file containing the unparsed snoop log. |
| |
| Returns: |
| A list of polling loop entries and APDU exchanges parsed from the file. |
| |
| Raises: |
| RuntimeError: If the file cannot be found. |
| """ |
| snoop_file = open_read_file(file_path) |
| str_data = "" |
| found_log = False |
| while line := snoop_file.readline(): |
| if not found_log and SNOOP_LOG_START in line: |
| found_log = True |
| elif found_log: |
| if SNOOP_LOG_END in line: |
| break |
| str_data += line |
| snoop_bytes = inflate(base64.b64decode(str_data)) |
| parsed = parse_file(snoop_bytes) |
| return standardize_log(parsed) |
| |
| |
| def find_apdu_transactions(data: bytes, ts: int) -> list[PartialApduEntry]: |
| """Finds all APDU transactions in the given data.""" |
| total_size = data[APDU_LEN_INDEX] |
| if total_size < APDU_MAIN_HEADER_SIZE or data[4:6] != APDU_IDENTIFIER: |
| return [] |
| |
| apdus: list[PartialApduEntry] = [] |
| index = APDU_DATA_START_INDEX |
| while index < len(data): |
| cur_size = data[index + 1] |
| cur_data = data[index : index + cur_size + 2] |
| cmd, is_first = parse_apdu_command(cur_data) or (None, None) |
| if cmd is not None: |
| apdus.append( |
| PartialApduEntry(ts=ts, is_command=True, data=cmd, is_first=is_first) |
| ) |
| else: |
| rsp, is_first = parse_apdu_response(cur_data) or (None, None) |
| if rsp is not None: |
| apdus.append( |
| PartialApduEntry( |
| ts=ts, is_command=False, data=rsp, is_first=is_first |
| ) |
| ) |
| index += cur_size + 2 |
| return apdus |
| |
| |
| def parse_apdu_command(data: bytes): |
| """Isolate the bytes sent from the reader to the emulator. |
| |
| Args: |
| data: The raw APDU command in bytes. |
| |
| Returns: |
| the data sent by the APDU command, or none if it is not a valid APDU |
| command. |
| """ |
| if len(data) < APDU_COMMAND_MIN_LENGTH: |
| return None |
| if data[0] != APDU_COMMAND_IDENTIFIER: |
| return None |
| if data[1] != len(data) - 2: |
| return None |
| if data[5:7] != bytes.fromhex("0000"): |
| return None |
| if data[8] in [APDU_ORDER_FIRST, APDU_ORDER_SECOND]: |
| is_first = True if data[8] == APDU_ORDER_FIRST else False |
| return data[9:-4], is_first |
| elif data[8:10] in [APDU_ORDER_FIRST_ALT, APDU_ORDER_SECOND_ALT]: |
| is_first = True if data[8:10] == APDU_ORDER_FIRST_ALT else False |
| return data[10:-4], is_first |
| return None |
| |
| |
| def parse_apdu_response(data: bytes): |
| """Isolates the data sent from the emulator to the reader. |
| |
| Args: |
| data: The raw APDU response in bytes. |
| |
| Returns: |
| the data sent by the APDU response, or none if it is not a valid APDU |
| response. |
| """ |
| if len(data) < APDU_RESPONSE_MIN_LENGTH: |
| return None |
| if data[0] != APDU_RESPONSE_IDENTIFIER: |
| return None |
| if data[1] != len(data) - 2: |
| return None |
| if data[5] in [APDU_ORDER_FIRST, APDU_ORDER_SECOND]: |
| is_first = True if data[5] == APDU_ORDER_FIRST else False |
| return data[6:-4], is_first |
| elif data[5:7] in [APDU_ORDER_FIRST_ALT, APDU_ORDER_SECOND_ALT]: |
| is_first = True if data[5:7] == APDU_ORDER_FIRST_ALT else False |
| return data[7:-4], is_first |
| elif data[7] in [APDU_ORDER_FIRST, APDU_ORDER_SECOND]: |
| is_first = True if data[7] == APDU_ORDER_FIRST else False |
| return data[8:-4], is_first |
| return None |
| |
| |
| def add_polling_data(data: bytes, ts: int) -> list[PollingLoopEntry]: |
| """Adds polling data to the list of transactions. |
| |
| Each entry may contain multiple polling data transactions. |
| |
| Args: |
| data: The raw polling data in bytes. |
| ts: The timestamp of the polling transaction. |
| |
| Returns: |
| A list of polling data transactions. |
| """ |
| transaction_list = [] |
| count = 4 |
| while count < len(data): |
| flag = data[count] |
| match flag: |
| case 0: |
| entry_type = NfcType.REMOTE_FIELD |
| case 1: |
| entry_type = NfcType.NFC_A |
| case 2: |
| entry_type = NfcType.NFC_B |
| case _: |
| entry_type = NfcType.UNKNOWN |
| length = data[count + 2] - 5 |
| polling_data = data[count + 8 : count + 8 + length] |
| transaction_list.append( |
| PollingLoopEntry( |
| ts=ts, |
| type=entry_type, |
| data=polling_data, |
| ) |
| ) |
| count += 8 + length |
| return transaction_list |
| |
| |
| def calculate_timestamp(data: bytes) -> int: |
| """Calculates the timestamp of the first frame in the log.""" |
| ts = data[1:9] |
| ts_real = ( |
| ts[0] |
| + (ts[1] << 8) |
| + (ts[2] << 16) |
| + (ts[3] << 24) |
| + (ts[4] << 32) |
| + (ts[5] << 40) |
| + (ts[6] << 48) |
| + (ts[7] << 56) |
| ) |
| offset = PREAMBLE_LENGTH |
| while (len(data) - offset) > HEADER_LENGTH: |
| length = data[offset] + (data[offset + 1] << 8) |
| pts = bytearray(data[offset + 2 : offset + 6]) |
| pts_real = pts[0] + (pts[1] << 8) + (pts[2] << 16) + (pts[3] << 24) |
| ts_real -= pts_real |
| offset += HEADER_LENGTH + length |
| return ts_real |
| |
| |
| def inflate(data: bytes) -> bytes: |
| """Inflates decompressed data.""" |
| decompressed = zlib.decompressobj().decompress(data[PREAMBLE_LENGTH:]) |
| return data[0:PREAMBLE_LENGTH] + decompressed |
| |
| |
| def open_read_file(file_path: str): |
| """Opens the file at the given path. |
| |
| Args: |
| file_path: The path of the file to be opened. This can be either a local |
| path or an absolute path. |
| |
| Returns: |
| An object representing the opened file. |
| |
| Raises: |
| RuntimeError: If the file cannot be opened. |
| """ |
| full_path = os.path.dirname(os.path.realpath(__file__)) + "/" + file_path |
| if os.path.exists(file_path): |
| file_to_open = file_path |
| elif os.path.exists(full_path): |
| file_to_open = full_path |
| else: |
| raise RuntimeError("File not found: {}".format(file_path)) |
| |
| try: |
| return open(file_to_open, "rt") |
| except Exception as e: |
| raise RuntimeError( |
| "Error occurred while opening file: {}".format(file_path) |
| ) from e |