blob: cd3c5f23940b50bb7fa6a07db230b9eed0b900e0 [file] [log] [blame]
Gilles Boccon-Gibod6ac91f72022-05-16 19:42:31 -07001# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import random
19import os
20from bumble.transport.common import PacketParser
21
22
23# -----------------------------------------------------------------------------
24class Sink:
25 def __init__(self):
26 self.packets = []
27
28 def on_packet(self, packet):
29 self.packets.append(packet)
30
31
32# -----------------------------------------------------------------------------
33def test_parser():
34 sink1 = Sink()
35 parser1 = PacketParser(sink1)
36 sink2 = Sink()
37 parser2 = PacketParser(sink2)
38
39 for parser in [parser1, parser2]:
Gilles Boccon-Gibod135df0d2022-12-10 08:53:51 -080040 with open(
41 os.path.join(os.path.dirname(__file__), 'hci_data_001.bin'), 'rb'
42 ) as input:
Gilles Boccon-Gibod6ac91f72022-05-16 19:42:31 -070043 while True:
44 n = random.randint(1, 9)
45 data = input.read(n)
46 if not data:
47 break
48 parser.feed_data(data)
49
Gilles Boccon-Gibod135df0d2022-12-10 08:53:51 -080050 assert sink1.packets == sink2.packets
Gilles Boccon-Gibod6ac91f72022-05-16 19:42:31 -070051
52
53# -----------------------------------------------------------------------------
54def test_parser_extensions():
55 sink = Sink()
56 parser = PacketParser(sink)
57
58 # Check that an exception is thrown for an unknown type
59 try:
60 parser.feed_data(bytes([0x77, 0x00, 0x02, 0x01, 0x02]))
61 exception_thrown = False
62 except ValueError:
63 exception_thrown = True
64
Gilles Boccon-Gibod135df0d2022-12-10 08:53:51 -080065 assert exception_thrown
Gilles Boccon-Gibod6ac91f72022-05-16 19:42:31 -070066
67 # Now add a custom info
68 parser.extended_packet_info[0x77] = (1, 1, 'B')
69 parser.reset()
70 parser.feed_data(bytes([0x77, 0x00, 0x02, 0x01, 0x02]))
Gilles Boccon-Gibod135df0d2022-12-10 08:53:51 -080071 assert len(sink.packets) == 1
Gilles Boccon-Gibod6ac91f72022-05-16 19:42:31 -070072
73
74# -----------------------------------------------------------------------------
Gilles Boccon-Giboddc3ac302023-03-19 21:18:44 -070075if __name__ == '__main__':
76 test_parser()
77 test_parser_extensions()