blob: bba6c665b656c535c7e3e1dec0b8d3e97a3f46a5 [file] [log] [blame]
# Copyright 2021-2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# -----------------------------------------------------------------------------
# Imports
# -----------------------------------------------------------------------------
import struct
import click
from colors import color
from bumble import hci
from bumble.transport.common import PacketReader
from bumble.helpers import PacketTracer
# -----------------------------------------------------------------------------
class SnoopPacketReader:
'''
Reader that reads HCI packets from a "snoop" file (based on RFC 1761, but not exactly the same...)
'''
DATALINK_H1 = 1001
DATALINK_H4 = 1002
DATALINK_BSCP = 1003
DATALINK_H5 = 1004
def __init__(self, source):
self.source = source
# Read the header
identification_pattern = source.read(8)
if identification_pattern.hex().lower() != '6274736e6f6f7000':
raise ValueError('not a valid snoop file, unexpected identification pattern')
(self.version_number, self.data_link_type) = struct.unpack('>II', source.read(8))
if self.data_link_type != self.DATALINK_H4 and self.data_link_type != self.DATALINK_H1:
raise ValueError(f'datalink type {self.data_link_type} not supported')
def next_packet(self):
# Read the record header
header = self.source.read(24)
if len(header) < 24:
return (0, None)
(
original_length,
included_length,
packet_flags,
cumulative_drops,
timestamp_seconds,
timestamp_microsecond
) = struct.unpack('>IIIIII', header)
# Abort on truncated packets
if original_length != included_length:
return (0, None)
if self.data_link_type == self.DATALINK_H1:
# The packet is un-encapsulated, look at the flags to figure out its type
if packet_flags & 1:
# Controller -> Host
if packet_flags & 2:
packet_type = hci.HCI_EVENT_PACKET
else:
packet_type = hci.HCI_ACL_DATA_PACKET
else:
# Host -> Controller
if packet_flags & 2:
packet_type = hci.HCI_COMMAND_PACKET
else:
packet_type = hci.HCI_ACL_DATA_PACKET
return (packet_flags & 1, bytes([packet_type]) + self.source.read(included_length))
else:
return (packet_flags & 1, self.source.read(included_length))
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
@click.command()
@click.option('--format', type=click.Choice(['h4', 'snoop']), default='h4', help='Format of the input file')
@click.argument('filename')
def show(format, filename):
input = open(filename, 'rb')
if format == 'h4':
packet_reader = PacketReader(input)
def read_next_packet():
(0, packet_reader.next_packet())
else:
packet_reader = SnoopPacketReader(input)
read_next_packet = packet_reader.next_packet
tracer = PacketTracer(emit_message=print)
while True:
try:
(direction, packet) = read_next_packet()
if packet is None:
break
tracer.trace(hci.HCI_Packet.from_bytes(packet), direction)
except Exception as error:
print(color(f'!!! {error}', 'red'))
pass
# -----------------------------------------------------------------------------
if __name__ == '__main__':
show()