| % Regression tests for the DoIP layer |
| |
| # More information at http://www.secdev.org/projects/UTscapy/ |
| |
| |
| ############ |
| ############ |
| |
| + Doip contrib tests |
| |
| = Load Contrib Layer |
| |
| load_contrib("automotive.doip", globals_dict=globals()) |
| load_contrib("automotive.uds", globals_dict=globals()) |
| |
| = Defaults test |
| |
| p = DoIP(payload_type=1) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == None |
| assert p.payload_type == 1 |
| |
| = Build test 0 |
| |
| p = DoIP(bytes(DoIP(payload_type=0))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 1 |
| assert p.payload_type == 0 |
| assert p.nack == 0 |
| |
| = Build test 1 |
| |
| p = DoIP(bytes(DoIP(payload_type=1))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 0 |
| assert p.payload_type == 1 |
| |
| = Build test 2 |
| |
| p = DoIP(bytes(DoIP(payload_type=2))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 6 |
| assert p.payload_type == 2 |
| assert bytes(p.eid) == b"\x00" * 6 |
| |
| = Build test 3 |
| |
| p = DoIP(bytes(DoIP(payload_type=3))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 17 |
| assert p.payload_type == 3 |
| assert bytes(p.vin) == b"\x00" * 17 |
| |
| = Build test 4 |
| |
| p = DoIP(bytes(DoIP(payload_type=4))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 33 |
| assert p.payload_type == 4 |
| assert bytes(p.vin) == b"\x00" * 17 |
| assert p.logical_address == 0 |
| assert bytes(p.eid) == b"\x00" * 6 |
| assert bytes(p.gid) == b"\x00" * 6 |
| assert p.further_action == 0 |
| assert p.vin_gid_status == 0 |
| |
| = Build test 5 |
| |
| p = DoIP(bytes(DoIP(payload_type=5))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 7 |
| assert p.payload_type == 5 |
| assert p.source_address == 0 |
| assert p.activation_type == 0 |
| assert p.reserved_iso == 0 |
| assert p.reserved_oem == b"" |
| |
| = Build test 5.1 |
| |
| p = DoIP(bytes(DoIP(payload_type=5, reserved_oem=b"1234"))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 11 |
| assert p.payload_type == 5 |
| assert p.source_address == 0 |
| assert p.activation_type == 0 |
| assert p.reserved_iso == 0 |
| p.show() |
| print(p.reserved_oem) |
| assert p.reserved_oem == b"1234" |
| |
| = Build test 5.2 |
| |
| p = DoIP(bytes(DoIP(payload_type=5, reserved_oem=b"12"))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 9 |
| assert p.payload_type == 5 |
| assert p.source_address == 0 |
| assert p.activation_type == 0 |
| assert p.reserved_iso == 0 |
| assert p.reserved_oem == b"12" |
| |
| = Build test 6 |
| |
| p = DoIP(bytes(DoIP(payload_type=6))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 9 |
| assert p.payload_type == 6 |
| assert p.logical_address_tester == 0 |
| assert p.logical_address_doip_entity == 0 |
| assert p.reserved_iso == 0 |
| assert p.reserved_oem == b"" |
| |
| = Build test 7 |
| |
| p = DoIP(bytes(DoIP(payload_type=7))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 0 |
| assert p.payload_type == 7 |
| |
| = Build test 8 |
| |
| p = DoIP(bytes(DoIP(payload_type=8))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 2 |
| assert p.payload_type == 8 |
| assert p.source_address == 0 |
| |
| = Build test 4001 |
| |
| p = DoIP(bytes(DoIP(payload_type=0x4001))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 0 |
| assert p.payload_type == 0x4001 |
| |
| |
| = Build test 4002 |
| |
| p = DoIP(bytes(DoIP(payload_type=0x4002))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 7 |
| assert p.payload_type == 0x4002 |
| assert p.node_type == 0 |
| assert p.max_open_sockets == 1 |
| assert p.cur_open_sockets == 0 |
| assert p.max_data_size == 0 |
| |
| |
| = Build test 4003 |
| |
| p = DoIP(bytes(DoIP(payload_type=0x4003))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 0 |
| assert p.payload_type == 0x4003 |
| |
| |
| = Build test 4004 |
| |
| p = DoIP(bytes(DoIP(payload_type=0x4004))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 1 |
| assert p.payload_type == 0x4004 |
| assert p.diagnostic_power_mode == 0 |
| |
| = Build test 8001 |
| |
| p = DoIP(bytes(DoIP(payload_type=0x8001))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 4 |
| assert p.payload_type == 0x8001 |
| assert p.source_address == 0 |
| assert p.target_address == 0 |
| |
| = Build test 8002 |
| |
| p = DoIP(bytes(DoIP(payload_type=0x8002))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 5 |
| assert p.payload_type == 0x8002 |
| assert p.source_address == 0 |
| assert p.target_address == 0 |
| assert p.ack_code == 0 |
| assert p.previous_msg == b'' |
| |
| p = DoIP(bytes(DoIP(payload_type=0x8002, previous_msg=b'\x22\xfd\x32'))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 8 |
| assert p.payload_type == 0x8002 |
| assert p.source_address == 0 |
| assert p.target_address == 0 |
| assert p.ack_code == 0 |
| assert p.previous_msg == b'\x22\xfd\x32' |
| |
| p = DoIP(bytes(DoIP(payload_type=0x8002, previous_msg=b'\x19\x02\x09\x9C\x00'))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 10 |
| assert p.payload_type == 0x8002 |
| assert p.source_address == 0 |
| assert p.target_address == 0 |
| assert p.ack_code == 0 |
| assert p.previous_msg == b'\x19\x02\t\x9c\x00' |
| |
| p = DoIP(b'\x02\xfd\x80\x02\x00\x00\x00\x07\x00\x08\x00\x0e\x00\x10\x01') |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xFD |
| assert p.payload_length == 7 |
| assert p.payload_type == 0x8002 |
| assert p.source_address == 0x8 |
| assert p.target_address == 0xE |
| assert p.ack_code == 0 |
| assert p.previous_msg == b'\x10\x01' |
| |
| = Build test 8003 |
| |
| p = DoIP(bytes(DoIP(payload_type=0x8003))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 5 |
| assert p.payload_type == 0x8003 |
| assert p.source_address == 0 |
| assert p.target_address == 0 |
| assert p.nack_code == 0 |
| |
| |
| p = DoIP(bytes(DoIP(payload_type=0x8003, previous_msg=b'\x2E\xfd\x32\x01\x02'))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 10 |
| assert p.payload_type == 0x8003 |
| assert p.source_address == 0 |
| assert p.target_address == 0 |
| assert p.nack_code == 0 |
| assert p.previous_msg == b'.\xfd2\x01\x02' |
| |
| p = DoIP(bytes(DoIP(payload_type=0x8003, previous_msg=b'\x19\x02\x09\x9A\x00'))) |
| |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 10 |
| assert p.payload_type == 0x8003 |
| assert p.source_address == 0 |
| assert p.target_address == 0 |
| assert p.nack_code == 0 |
| assert p.previous_msg == b'\x19\x02\t\x9a\x00' |
| |
| p = DoIP(b'\x02\xfd\x80\x03\x00\x00\x00\x07\x00\x0A\x00\x0C\x00\x10\x03') |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xFD |
| assert p.payload_length == 7 |
| assert p.payload_type == 0x8003 |
| assert p.source_address == 0xA |
| assert p.target_address == 0xC |
| assert p.nack_code == 0 |
| assert p.previous_msg == b'\x10\x03' |
| |
| + pcap based tests |
| |
| = read diag_ack pcap file |
| pkt = rdpcap(scapy_path("test/pcaps/doip_ack.pcap")).res[0] |
| |
| assert len(pkt) == 70 |
| |
| = dissect test of diag ACK with previous_msg field filled |
| assert pkt.protocol_version == 0x02 |
| assert pkt.inverse_version == 0xFD |
| assert pkt.payload_length == 8 |
| assert pkt.source_address == 0x4B |
| assert pkt.target_address == 0xE00 |
| assert pkt.ack_code == 0 |
| assert pkt.previous_msg == b'\x22\xFD\x31' |
| |
| |
| = read main pcap file |
| |
| pkts = rdpcap(scapy_path("test/pcaps/doip.pcap.gz")) |
| ips = [p for p in pkts if p.proto == 6] |
| |
| assert len(ips) > 1 |
| |
| = dissect test of routing activation pkts req |
| |
| req = ips[0] |
| p = req |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 11 |
| assert p.payload_type == 0x5 |
| assert p.source_address == 0xe80 |
| assert p.activation_type == 0 |
| assert p.reserved_iso == 0 |
| assert p.reserved_oem == b"\x00\x00\x00\x00" |
| |
| = dissect test of routing activation pkts resp |
| |
| resp = ips[1] |
| p = resp |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 9 |
| assert p.payload_type == 0x6 |
| assert p.logical_address_tester == 0xe80 |
| assert p.logical_address_doip_entity == 0x4010 |
| assert p.routing_activation_response == 16 |
| assert p.reserved_iso == 0 |
| |
| = answers test of routing activation pkts |
| |
| assert resp.answers(req) |
| assert resp.hashret() == req.hashret() |
| |
| = dissect diagnostic message |
| |
| req = ips[-4] |
| resp = ips[-1] |
| |
| p = req |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 6 |
| assert p.payload_type == 0x8001 |
| assert p.source_address == 0xe80 |
| assert p.target_address == 0x4010 |
| assert bytes(p)[-2:] == bytes(UDS()/UDS_DSC(b"\x02")) |
| assert p.service == 0x10 |
| assert p.diagnosticSessionType == 2 |
| |
| p = resp |
| assert p.protocol_version == 0x02 |
| assert p.inverse_version == 0xfd |
| assert p.payload_length == 10 |
| assert p.payload_type == 0x8001 |
| assert p.target_address == 0xe80 |
| assert p.source_address == 0x4010 |
| assert bytes(p)[-6:] == bytes(UDS()/UDS_DSCPR(b"\x02\x002\x01\xf4")) |
| assert p.service == 0x50 |
| assert p.diagnosticSessionType == 2 |
| |
| assert req.hashret() == resp.hashret() |
| # exclude TCP layer from answers check |
| assert resp[3].answers(req[3]) |
| assert not req[3].answers(resp[3]) |
| |
| = TCPSession Test |
| |
| tmp_file = get_temp_file() |
| |
| wrpcap(tmp_file, [ |
| IP(src="10.10.10.10", dst="10.10.10.11") / TCP(sport=61000, seq=1) / DoIP(payload_type=0x8001, payload_length=6) / b"\x3E", |
| IP(src="10.10.10.10", dst="10.10.10.11") / TCP(sport=61000, dport=13400, seq=14) / Raw(load=b"\xff") |
| ]) |
| |
| pkts = sniff(offline=tmp_file, session=TCPSession) |
| assert pkts[0].haslayer(UDS_TP) |
| assert pkts[0].service == 0x3E |
| |
| = TCPSession Test multiple DoIP messages |
| |
| filename = scapy_path("/test/pcaps/multiple_doip_layers.pcap.gz") |
| |
| pkts = sniff(offline=filename, session=TCPSession) |
| print(repr(pkts[0])) |
| print(repr(pkts[1])) |
| assert len(pkts) == 2 |
| assert pkts[0][DoIP].payload_length == 2 |
| assert pkts[0][DoIP:2].payload_length == 7 |
| assert pkts[1][DoIP].payload_length == 103 |
| |
| + DoIP Communication tests |
| |
| = Load libraries |
| import base64 |
| import ssl |
| import tempfile |
| |
| = Test DoIPSocket |
| |
| server_up = threading.Event() |
| sniff_up = threading.Event() |
| def server(): |
| buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| try: |
| sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| sock.bind(('127.0.0.1', 13400)) |
| sock.listen(1) |
| server_up.set() |
| connection, address = sock.accept() |
| sniff_up.wait(timeout=1) |
| connection.send(buffer) |
| connection.close() |
| finally: |
| sock.close() |
| |
| |
| server_thread = threading.Thread(target=server) |
| server_thread.start() |
| server_up.wait(timeout=1) |
| sock = DoIPSocket(activate_routing=False) |
| |
| pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) |
| server_thread.join(timeout=1) |
| assert len(pkts) == 2 |
| |
| |
| = Test DoIPSocket 2 |
| ~ linux |
| |
| server_up = threading.Event() |
| sniff_up = threading.Event() |
| def server(): |
| buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| try: |
| sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| sock.bind(('127.0.0.1', 13400)) |
| sock.listen(1) |
| server_up.set() |
| connection, address = sock.accept() |
| sniff_up.wait(timeout=1) |
| for i in range(len(buffer)): |
| connection.send(buffer[i:i+1]) |
| time.sleep(0.01) |
| connection.close() |
| finally: |
| sock.close() |
| |
| |
| server_thread = threading.Thread(target=server) |
| server_thread.start() |
| server_up.wait(timeout=1) |
| sock = DoIPSocket(activate_routing=False) |
| |
| pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) |
| server_thread.join(timeout=1) |
| assert len(pkts) == 2 |
| |
| = Test DoIPSocket 3 |
| |
| server_up = threading.Event() |
| sniff_up = threading.Event() |
| def server(): |
| buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| try: |
| sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| sock.bind(('127.0.0.1', 13400)) |
| sock.listen(1) |
| server_up.set() |
| connection, address = sock.accept() |
| sniff_up.wait(timeout=1) |
| while buffer: |
| randlen = random.randint(0, len(buffer)) |
| connection.send(buffer[:randlen]) |
| buffer = buffer[randlen:] |
| time.sleep(0.01) |
| connection.close() |
| finally: |
| sock.close() |
| |
| |
| server_thread = threading.Thread(target=server) |
| server_thread.start() |
| server_up.wait(timeout=1) |
| sock = DoIPSocket(activate_routing=False) |
| |
| pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) |
| server_thread.join(timeout=1) |
| assert len(pkts) == 2 |
| |
| |
| = Test DoIPSocket6 |
| |
| server_up = threading.Event() |
| sniff_up = threading.Event() |
| def server(): |
| buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' |
| sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
| try: |
| sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| sock.bind(('::1', 13400)) |
| sock.listen(1) |
| server_up.set() |
| connection, address = sock.accept() |
| sniff_up.wait(timeout=1) |
| connection.send(buffer) |
| connection.close() |
| finally: |
| sock.close() |
| |
| |
| server_thread = threading.Thread(target=server) |
| server_thread.start() |
| server_up.wait(timeout=1) |
| sock = DoIPSocket(ip="::1", activate_routing=False) |
| |
| pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) |
| server_thread.join(timeout=1) |
| assert len(pkts) == 2 |
| |
| = Test DoIPSslSocket |
| ~ broken_windows |
| |
| certstring = """ |
| LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2QUlCQURBTkJna3Foa2lHOXcwQkFRRUZB |
| QVNDQktZd2dnU2lBZ0VBQW9JQkFRRFUvK0hRbVpzSDl2QVcKQ3ZMQjRxalpnZFJSSXE1b2JBanB4 |
| YUhoUGxCVEMvUlBzMHIxRVF0V0FtbXNEZFE3UGlLaCtYa1hES3pNY3lJSQp1a0ZpNThUQW1idGFj |
| N0U5VmJHSnNlTWp2RkJKSkFqQXVtbFdRZk5XcSs2TkZhdmRkTDQrSTNBTVJ5TldJTkJYCjhHMzRo |
| dldIbDdTOGhhSFFZN0FXcUZWVTNVL2xKR2pubnF3MEJraEIvVGRCTWIwM0habzkrVjIrWU9RZmk5 |
| QWsKTVRSRXpSeWVObWJqT0sxbHpXdFJXWkZZU0RnMEtqUVh4SkdFNVc5MzFPWitHL1NkbytTM1ZW |
| SVRPdWxQbHRmVwpXMEdjeCsvZERSNFIxNG5mcUl5L1daMElHUVNXMlRsQytmeGJ0dURDUkFqelRz |
| b0J3YjJ0cnpoR0VtYVFveUtNCnpBKzVSUHNyQWdNQkFBRUNnZ0VBRUJHaEoyWm5OVHh5YVY5TnZY |
| QjI1NDNZQnRUMGVSUHBhanJLMXg0bk1OU3oKNE9LNFVzWlo1MnBnTHRHT1EzZm1aS0l0cEo1WlY1 |
| cVBUejdwN3VjUzhnQWNZUnNJUnpCMHA5d3FpWExMK3h0RApxUjB4dnR4VDJpUGlFblVNNndudHpr |
| SHpKK0g0QkZLT2FvdjNaK3Fha2E1UmFCcmhheGRuaDBDNklLQmZtM3cyCm5zUWI2N0lCYWwrSnBs |
| L1g5TENWRkdRT2owb0lmVWI5ZFp3OWQ3MCthSGVVb2xvMGdYZmxxcXFFcnl3ZDlPN2QKNnp4dGlx |
| cnRyZUJhK1IraWs3NE1SK0xvaFNVR3o2VTRQaXhWQ3l1SnQ2U0hvRHR2L3dtSnltWDd2a0FRS2w1 |
| RQplK1JqUGVyakpUWTNzNXNXbEd2V21UTEtEbnVyS2pBYzZUOHhKb0pXWlFLQmdRRHdsd2RRdmww |
| S28wNHhDUmtiCklYRGVJZE1jZkp2ejRGZEtka1BmVnZVT2xHVEpNZkRzbWNoUzZhcEJCQUdQMUU2 |
| VkN2VzJmUFdjaXhScHE3MW8KR2xtbWZ5RnlJRW0rL08yamMvSFRXWHp6Qjdoc0JISEltQklHczFU |
| TC9iWFU3amhVQW5kWDdMK3RSRDBKNWRGVwpiN1VOOXNxaWdtRG42REJWZkxaUHgxRnlWUUtCZ1FE |
| aXBIT1BhNmVMSlk5R1FZdkw3OTIyTHNoU3ZYSUFVMERGCjBabTlqbjM2b3ZIY0kvWEZDdHVXank2 |
| WG9wbk9pbjlycmtUY2FDUnBvSEFNb00ycHdiR0tFY0dVVEY2RHQ3akYKRHVnd2srR21sbDkrbjM2 |
| M3Iwb09YNktSbWFhRStiZHoyNjNQVEhMaktYUnFyc3h5WEtMT3ZyTXhVNWNzMXJCeQpTMWI2ZGhr |
| M2Z3S0JnRjlONUliMnNkS3ArQ3B5aVRCM0ljZk1yRjBuZTN1ekRjRWdjaWlCd05lQ3J4NElHNEVP |
| Ck5nMnFKRmhXNXV0NzFaa3kyenpyNlR1VzJJSTNsdk1ySlFKUWNBWk9oZ2dURjJ2ZFhSazA1TXM4 |
| N3JCVFhtTncKNGdzbmROck42UDZ0VTBEc0xTeDJTME91dVdNM1Y2S2U0NkRoZDBuQ3pmSnZ4dDNH |
| WmszYURnaDFBb0dBWFhIcQpoNDZlZEx1V3VDUGNUTWhvUkc1RGdBSEdHQ1k3UlpTbTY4WHRZVUov |
| c0FGUG10OWdMRko2cG1DUFE5NU1yUXdjCkxqZnVFM0xuMy8wSTd0NENvbWV4eGNBN0U5blRIOFNH |
| clVpN3QrQzJITklNQUJZUTFaNU91L042K2Nhd0FkL28KYU5rZllWTzlRU015L2svOWZIcWFEVk5t |
| dUVFSVhRZDlKQ1UvUG1jQ2dZQWI0RTBRWTdDZmlrV293OFIzSlhoZgo0MHFVVkdud09QKzJNbXE5 |
| d2ZmWkpTRHNFSTQvb2g0VGRnN0sybHNNazVsWnRaMyszTjljSDVUc1pMYlJtd2FMCm9sRVl6K1BB |
| WU91MlMrY1l2bFlNL0V2WmlpRHJybjZuTStNbTNnaXJPYkNwMzcxd1ZxRFVsUnB4OUlwWVdYcnAK |
| T3YxUXFHdXkwODdyQkk1cStWL3hqQT09Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0KLS0tLS1C |
| RUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUQ3VENDQXRXZ0F3SUJBZ0lVVTNsendsTVNSa294Tkdk |
| SFJzZllIcUtxcDAwd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2dZVXhDekFKQmdOVkJBWVRBa1JGTVJN |
| d0VRWURWUVFJREFwVGIyMWxMVk4wWVhSbE1Rd3dDZ1lEVlFRSApEQU5TUlVjeEVUQVBCZ05WQkFv |
| TUNHUnBjM05sWTNSdk1Rd3dDZ1lEVlFRTERBTkVSVll4RFRBTEJnTlZCQU1NCkJGUkZVMVF4SXpB |
| aEJna3Foa2lHOXcwQkNRRVdGR052Ym5SaFkzUXRkWE5BWkdsemMyVmpMblJ2TUI0WERUSTAKTURN |
| eE9ERTVNek13TlZvWERUSTBNRFF4TnpFNU16TXdOVm93Z1lVeEN6QUpCZ05WQkFZVEFrUkZNUk13 |
| RVFZRApWUVFJREFwVGIyMWxMVk4wWVhSbE1Rd3dDZ1lEVlFRSERBTlNSVWN4RVRBUEJnTlZCQW9N |
| Q0dScGMzTmxZM1J2Ck1Rd3dDZ1lEVlFRTERBTkVSVll4RFRBTEJnTlZCQU1NQkZSRlUxUXhJekFo |
| QmdrcWhraUc5dzBCQ1FFV0ZHTnYKYm5SaFkzUXRkWE5BWkdsemMyVmpMblJ2TUlJQklqQU5CZ2tx |
| aGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQwpBUUVBMVAvaDBKbWJCL2J3RmdyeXdlS28yWUhV |
| VVNLdWFHd0k2Y1doNFQ1UVV3djBUN05LOVJFTFZnSnByQTNVCk96NGlvZmw1Rnd5c3pITWlDTHBC |
| WXVmRXdKbTdXbk94UFZXeGliSGpJN3hRU1NRSXdMcHBWa0h6VnF2dWpSV3IKM1hTK1BpTndERWNq |
| VmlEUVYvQnQrSWIxaDVlMHZJV2gwR093RnFoVlZOMVA1U1JvNTU2c05BWklRZjAzUVRHOQpOeDJh |
| UGZsZHZtRGtINHZRSkRFMFJNMGNualptNHppdFpjMXJVVm1SV0VnNE5DbzBGOFNSaE9WdmQ5VG1m |
| aHYwCm5hUGt0MVZTRXpycFQ1YlgxbHRCbk1mdjNRMGVFZGVKMzZpTXYxbWRDQmtFbHRrNVF2bjhX |
| N2Jnd2tRSTgwN0sKQWNHOXJhODRSaEpta0tNaWpNd1B1VVQ3S3dJREFRQUJvMU13VVRBZEJnTlZI |
| UTRFRmdRVVZhbUFkUjR1ZW8zQgpmV0RjUlMyUkQ3OEtlZXd3SHdZRFZSMGpCQmd3Rm9BVVZhbUFk |
| UjR1ZW8zQmZXRGNSUzJSRDc4S2Vld3dEd1lEClZSMFRBUUgvQkFVd0F3RUIvekFOQmdrcWhraUc5 |
| dzBCQVFzRkFBT0NBUUVBRjE1TTNvL3RyUVdYeHdHamlxZjgKNXBUTEM0bHJwQkZaTFZDbStQdHd4 |
| aENlN1ZSd2dLMElBb01EMW0vSjNEYnVJSjVURXlTVElnR2N0WHVNbG5pWgpsY3IwekZOZVVhQ08w |
| YkdhaExYUXpCWTRxSkhTTUNWNnhiNXNqUDlEdk9HYnFxbHVTbk51ZFJ5UWNIbkd4SE0rCk1adXpO |
| WUNseklOMEtYbFJuSTZqRXUrcG9XZ0pEMGN1NFM2b1lwT2R3bElRYmtaNnIrUE1jQ3hpRmhRd3E2 |
| em4KcE1nQzB0WlpSM3pCOEpVcTJwRHlGVy9jVlFjWkp5YUhnQkkwWlJWWG5wbDFqYng2YlNIOCts |
| cnMxVk1xZDlkcQozd1BMcjBheWI2VkpNa29WMjNWSXAzLzlYQVpTR3Z6Y0dadnM2VThSUTdFbUtx |
| akJibWxudm1CTkpUMk9xbFFRCllRPT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo=""" |
| |
| certstring = certstring.replace('\n', '') |
| |
| def _load_certificate_chain(context) -> None: |
| with tempfile.NamedTemporaryFile(delete=False) as fp: |
| fp.write(base64.b64decode(certstring)) |
| fp.close() |
| context.load_cert_chain(fp.name) |
| |
| |
| server_up = threading.Event() |
| sniff_up = threading.Event() |
| def server(): |
| context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
| _load_certificate_chain(context) |
| context.check_hostname = False |
| context.verify_mode = ssl.CERT_NONE |
| buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' |
| sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
| sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| ssock = context.wrap_socket(sock) |
| try: |
| ssock.bind(('127.0.0.1', 3496)) |
| ssock.listen(1) |
| server_up.set() |
| connection, address = ssock.accept() |
| sniff_up.wait(timeout=1) |
| connection.send(buffer) |
| connection.close() |
| finally: |
| ssock.close() |
| |
| |
| server_thread = threading.Thread(target=server) |
| server_thread.start() |
| server_up.wait(timeout=1) |
| context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
| context.check_hostname = False |
| context.verify_mode = ssl.CERT_NONE |
| sock = DoIPSocket(activate_routing=False, force_tls=True, context=context) |
| |
| pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) |
| server_thread.join(timeout=1) |
| assert len(pkts) == 2 |
| |
| = Test DoIPSslSocket6 |
| ~ broken_windows |
| |
| server_up = threading.Event() |
| sniff_up = threading.Event() |
| def server(): |
| context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
| _load_certificate_chain(context) |
| context.check_hostname = False |
| context.verify_mode = ssl.CERT_NONE |
| buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' |
| sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
| sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| ssock = context.wrap_socket(sock) |
| try: |
| ssock.bind(('::1', 3496)) |
| ssock.listen(1) |
| server_up.set() |
| connection, address = ssock.accept() |
| sniff_up.wait(timeout=1) |
| connection.send(buffer) |
| connection.close() |
| finally: |
| ssock.close() |
| |
| |
| server_thread = threading.Thread(target=server) |
| server_thread.start() |
| server_up.wait(timeout=1) |
| context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
| context.check_hostname = False |
| context.verify_mode = ssl.CERT_NONE |
| sock = DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context) |
| |
| pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) |
| server_thread.join(timeout=1) |
| assert len(pkts) == 2 |
| |
| = Test UDS_DoIPSslSocket6 |
| ~ broken_windows |
| |
| server_up = threading.Event() |
| sniff_up = threading.Event() |
| def server(): |
| context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
| _load_certificate_chain(context) |
| context.check_hostname = False |
| context.verify_mode = ssl.CERT_NONE |
| buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' |
| sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
| sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| ssock = context.wrap_socket(sock) |
| try: |
| ssock.bind(('::1', 3496)) |
| ssock.listen(1) |
| server_up.set() |
| connection, address = ssock.accept() |
| sniff_up.wait(timeout=1) |
| connection.send(buffer) |
| connection.close() |
| finally: |
| ssock.close() |
| |
| |
| server_thread = threading.Thread(target=server) |
| server_thread.start() |
| server_up.wait(timeout=1) |
| context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
| context.check_hostname = False |
| context.verify_mode = ssl.CERT_NONE |
| sock = UDS_DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context) |
| |
| pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) |
| server_thread.join(timeout=1) |
| assert len(pkts) == 2 |
| |
| = Test UDS_DualDoIPSslSocket6 |
| ~ broken_windows not_pypy |
| |
| server_tcp_up = threading.Event() |
| server_tls_up = threading.Event() |
| sniff_up = threading.Event() |
| def server_tls(): |
| context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) |
| _load_certificate_chain(context) |
| context.check_hostname = False |
| context.verify_mode = ssl.CERT_NONE |
| buffer = bytes.fromhex("02fd0006000000090e8011061000000000") |
| buffer += b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4' |
| sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
| sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| ssock = context.wrap_socket(sock) |
| try: |
| ssock.bind(('::1', 3496)) |
| ssock.listen(1) |
| server_tls_up.set() |
| connection, address = ssock.accept() |
| sniff_up.wait(timeout=1) |
| connection.send(buffer) |
| connection.close() |
| finally: |
| ssock.close() |
| |
| def server_tcp(): |
| buffer = bytes.fromhex("02fd0006000000090e8011060700000000") |
| sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) |
| try: |
| sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) |
| sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) |
| sock.bind(('::1', 13400)) |
| sock.listen(1) |
| server_tcp_up.set() |
| connection, address = sock.accept() |
| connection.send(buffer) |
| connection.shutdown(socket.SHUT_RDWR) |
| connection.close() |
| finally: |
| sock.close() |
| |
| |
| server_tcp_thread = threading.Thread(target=server_tcp) |
| server_tcp_thread.start() |
| server_tcp_up.wait(timeout=1) |
| server_tls_thread = threading.Thread(target=server_tls) |
| server_tls_thread.start() |
| server_tls_up.wait(timeout=1) |
| context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) |
| context.check_hostname = False |
| context.verify_mode = ssl.CERT_NONE |
| |
| |
| sock = UDS_DoIPSocket(ip="::1", context=context) |
| |
| pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set) |
| server_tcp_thread.join(timeout=1) |
| server_tls_thread.join(timeout=1) |
| assert len(pkts) == 2 |