blob: 9a7d61e3b7def98c8bf43d68b3793eadc4f7f1bf [file] [log] [blame]
% Regression tests for the DoIP layer
# More information at http://www.secdev.org/projects/UTscapy/
############
############
+ Doip contrib tests
= Load Contrib Layer
load_contrib("automotive.doip", globals_dict=globals())
load_contrib("automotive.uds", globals_dict=globals())
= Defaults test
p = DoIP(payload_type=1)
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == None
assert p.payload_type == 1
= Build test 0
p = DoIP(bytes(DoIP(payload_type=0)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 1
assert p.payload_type == 0
assert p.nack == 0
= Build test 1
p = DoIP(bytes(DoIP(payload_type=1)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 0
assert p.payload_type == 1
= Build test 2
p = DoIP(bytes(DoIP(payload_type=2)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 6
assert p.payload_type == 2
assert bytes(p.eid) == b"\x00" * 6
= Build test 3
p = DoIP(bytes(DoIP(payload_type=3)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 17
assert p.payload_type == 3
assert bytes(p.vin) == b"\x00" * 17
= Build test 4
p = DoIP(bytes(DoIP(payload_type=4)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 33
assert p.payload_type == 4
assert bytes(p.vin) == b"\x00" * 17
assert p.logical_address == 0
assert bytes(p.eid) == b"\x00" * 6
assert bytes(p.gid) == b"\x00" * 6
assert p.further_action == 0
assert p.vin_gid_status == 0
= Build test 5
p = DoIP(bytes(DoIP(payload_type=5)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 7
assert p.payload_type == 5
assert p.source_address == 0
assert p.activation_type == 0
assert p.reserved_iso == 0
assert p.reserved_oem == b""
= Build test 5.1
p = DoIP(bytes(DoIP(payload_type=5, reserved_oem=b"1234")))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 11
assert p.payload_type == 5
assert p.source_address == 0
assert p.activation_type == 0
assert p.reserved_iso == 0
p.show()
print(p.reserved_oem)
assert p.reserved_oem == b"1234"
= Build test 5.2
p = DoIP(bytes(DoIP(payload_type=5, reserved_oem=b"12")))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 9
assert p.payload_type == 5
assert p.source_address == 0
assert p.activation_type == 0
assert p.reserved_iso == 0
assert p.reserved_oem == b"12"
= Build test 6
p = DoIP(bytes(DoIP(payload_type=6)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 9
assert p.payload_type == 6
assert p.logical_address_tester == 0
assert p.logical_address_doip_entity == 0
assert p.reserved_iso == 0
assert p.reserved_oem == b""
= Build test 7
p = DoIP(bytes(DoIP(payload_type=7)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 0
assert p.payload_type == 7
= Build test 8
p = DoIP(bytes(DoIP(payload_type=8)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 2
assert p.payload_type == 8
assert p.source_address == 0
= Build test 4001
p = DoIP(bytes(DoIP(payload_type=0x4001)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 0
assert p.payload_type == 0x4001
= Build test 4002
p = DoIP(bytes(DoIP(payload_type=0x4002)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 7
assert p.payload_type == 0x4002
assert p.node_type == 0
assert p.max_open_sockets == 1
assert p.cur_open_sockets == 0
assert p.max_data_size == 0
= Build test 4003
p = DoIP(bytes(DoIP(payload_type=0x4003)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 0
assert p.payload_type == 0x4003
= Build test 4004
p = DoIP(bytes(DoIP(payload_type=0x4004)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 1
assert p.payload_type == 0x4004
assert p.diagnostic_power_mode == 0
= Build test 8001
p = DoIP(bytes(DoIP(payload_type=0x8001)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 4
assert p.payload_type == 0x8001
assert p.source_address == 0
assert p.target_address == 0
= Build test 8002
p = DoIP(bytes(DoIP(payload_type=0x8002)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 5
assert p.payload_type == 0x8002
assert p.source_address == 0
assert p.target_address == 0
assert p.ack_code == 0
assert p.previous_msg == b''
p = DoIP(bytes(DoIP(payload_type=0x8002, previous_msg=b'\x22\xfd\x32')))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 8
assert p.payload_type == 0x8002
assert p.source_address == 0
assert p.target_address == 0
assert p.ack_code == 0
assert p.previous_msg == b'\x22\xfd\x32'
p = DoIP(bytes(DoIP(payload_type=0x8002, previous_msg=b'\x19\x02\x09\x9C\x00')))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 10
assert p.payload_type == 0x8002
assert p.source_address == 0
assert p.target_address == 0
assert p.ack_code == 0
assert p.previous_msg == b'\x19\x02\t\x9c\x00'
p = DoIP(b'\x02\xfd\x80\x02\x00\x00\x00\x07\x00\x08\x00\x0e\x00\x10\x01')
assert p.protocol_version == 0x02
assert p.inverse_version == 0xFD
assert p.payload_length == 7
assert p.payload_type == 0x8002
assert p.source_address == 0x8
assert p.target_address == 0xE
assert p.ack_code == 0
assert p.previous_msg == b'\x10\x01'
= Build test 8003
p = DoIP(bytes(DoIP(payload_type=0x8003)))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 5
assert p.payload_type == 0x8003
assert p.source_address == 0
assert p.target_address == 0
assert p.nack_code == 0
p = DoIP(bytes(DoIP(payload_type=0x8003, previous_msg=b'\x2E\xfd\x32\x01\x02')))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 10
assert p.payload_type == 0x8003
assert p.source_address == 0
assert p.target_address == 0
assert p.nack_code == 0
assert p.previous_msg == b'.\xfd2\x01\x02'
p = DoIP(bytes(DoIP(payload_type=0x8003, previous_msg=b'\x19\x02\x09\x9A\x00')))
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 10
assert p.payload_type == 0x8003
assert p.source_address == 0
assert p.target_address == 0
assert p.nack_code == 0
assert p.previous_msg == b'\x19\x02\t\x9a\x00'
p = DoIP(b'\x02\xfd\x80\x03\x00\x00\x00\x07\x00\x0A\x00\x0C\x00\x10\x03')
assert p.protocol_version == 0x02
assert p.inverse_version == 0xFD
assert p.payload_length == 7
assert p.payload_type == 0x8003
assert p.source_address == 0xA
assert p.target_address == 0xC
assert p.nack_code == 0
assert p.previous_msg == b'\x10\x03'
+ pcap based tests
= read diag_ack pcap file
pkt = rdpcap(scapy_path("test/pcaps/doip_ack.pcap")).res[0]
assert len(pkt) == 70
= dissect test of diag ACK with previous_msg field filled
assert pkt.protocol_version == 0x02
assert pkt.inverse_version == 0xFD
assert pkt.payload_length == 8
assert pkt.source_address == 0x4B
assert pkt.target_address == 0xE00
assert pkt.ack_code == 0
assert pkt.previous_msg == b'\x22\xFD\x31'
= read main pcap file
pkts = rdpcap(scapy_path("test/pcaps/doip.pcap.gz"))
ips = [p for p in pkts if p.proto == 6]
assert len(ips) > 1
= dissect test of routing activation pkts req
req = ips[0]
p = req
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 11
assert p.payload_type == 0x5
assert p.source_address == 0xe80
assert p.activation_type == 0
assert p.reserved_iso == 0
assert p.reserved_oem == b"\x00\x00\x00\x00"
= dissect test of routing activation pkts resp
resp = ips[1]
p = resp
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 9
assert p.payload_type == 0x6
assert p.logical_address_tester == 0xe80
assert p.logical_address_doip_entity == 0x4010
assert p.routing_activation_response == 16
assert p.reserved_iso == 0
= answers test of routing activation pkts
assert resp.answers(req)
assert resp.hashret() == req.hashret()
= dissect diagnostic message
req = ips[-4]
resp = ips[-1]
p = req
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 6
assert p.payload_type == 0x8001
assert p.source_address == 0xe80
assert p.target_address == 0x4010
assert bytes(p)[-2:] == bytes(UDS()/UDS_DSC(b"\x02"))
assert p.service == 0x10
assert p.diagnosticSessionType == 2
p = resp
assert p.protocol_version == 0x02
assert p.inverse_version == 0xfd
assert p.payload_length == 10
assert p.payload_type == 0x8001
assert p.target_address == 0xe80
assert p.source_address == 0x4010
assert bytes(p)[-6:] == bytes(UDS()/UDS_DSCPR(b"\x02\x002\x01\xf4"))
assert p.service == 0x50
assert p.diagnosticSessionType == 2
assert req.hashret() == resp.hashret()
# exclude TCP layer from answers check
assert resp[3].answers(req[3])
assert not req[3].answers(resp[3])
= TCPSession Test
tmp_file = get_temp_file()
wrpcap(tmp_file, [
IP(src="10.10.10.10", dst="10.10.10.11") / TCP(sport=61000, seq=1) / DoIP(payload_type=0x8001, payload_length=6) / b"\x3E",
IP(src="10.10.10.10", dst="10.10.10.11") / TCP(sport=61000, dport=13400, seq=14) / Raw(load=b"\xff")
])
pkts = sniff(offline=tmp_file, session=TCPSession)
assert pkts[0].haslayer(UDS_TP)
assert pkts[0].service == 0x3E
= TCPSession Test multiple DoIP messages
filename = scapy_path("/test/pcaps/multiple_doip_layers.pcap.gz")
pkts = sniff(offline=filename, session=TCPSession)
print(repr(pkts[0]))
print(repr(pkts[1]))
assert len(pkts) == 2
assert pkts[0][DoIP].payload_length == 2
assert pkts[0][DoIP:2].payload_length == 7
assert pkts[1][DoIP].payload_length == 103
+ DoIP Communication tests
= Load libraries
import base64
import ssl
import tempfile
= Test DoIPSocket
server_up = threading.Event()
sniff_up = threading.Event()
def server():
buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', 13400))
sock.listen(1)
server_up.set()
connection, address = sock.accept()
sniff_up.wait(timeout=1)
connection.send(buffer)
connection.close()
finally:
sock.close()
server_thread = threading.Thread(target=server)
server_thread.start()
server_up.wait(timeout=1)
sock = DoIPSocket(activate_routing=False)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2
= Test DoIPSocket 2
~ linux
server_up = threading.Event()
sniff_up = threading.Event()
def server():
buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', 13400))
sock.listen(1)
server_up.set()
connection, address = sock.accept()
sniff_up.wait(timeout=1)
for i in range(len(buffer)):
connection.send(buffer[i:i+1])
time.sleep(0.01)
connection.close()
finally:
sock.close()
server_thread = threading.Thread(target=server)
server_thread.start()
server_up.wait(timeout=1)
sock = DoIPSocket(activate_routing=False)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2
= Test DoIPSocket 3
server_up = threading.Event()
sniff_up = threading.Event()
def server():
buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', 13400))
sock.listen(1)
server_up.set()
connection, address = sock.accept()
sniff_up.wait(timeout=1)
while buffer:
randlen = random.randint(0, len(buffer))
connection.send(buffer[:randlen])
buffer = buffer[randlen:]
time.sleep(0.01)
connection.close()
finally:
sock.close()
server_thread = threading.Thread(target=server)
server_thread.start()
server_up.wait(timeout=1)
sock = DoIPSocket(activate_routing=False)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2
= Test DoIPSocket6
server_up = threading.Event()
sniff_up = threading.Event()
def server():
buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
try:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('::1', 13400))
sock.listen(1)
server_up.set()
connection, address = sock.accept()
sniff_up.wait(timeout=1)
connection.send(buffer)
connection.close()
finally:
sock.close()
server_thread = threading.Thread(target=server)
server_thread.start()
server_up.wait(timeout=1)
sock = DoIPSocket(ip="::1", activate_routing=False)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2
= Test DoIPSslSocket
~ broken_windows
certstring = """
LS0tLS1CRUdJTiBQUklWQVRFIEtFWS0tLS0tCk1JSUV2QUlCQURBTkJna3Foa2lHOXcwQkFRRUZB
QVNDQktZd2dnU2lBZ0VBQW9JQkFRRFUvK0hRbVpzSDl2QVcKQ3ZMQjRxalpnZFJSSXE1b2JBanB4
YUhoUGxCVEMvUlBzMHIxRVF0V0FtbXNEZFE3UGlLaCtYa1hES3pNY3lJSQp1a0ZpNThUQW1idGFj
N0U5VmJHSnNlTWp2RkJKSkFqQXVtbFdRZk5XcSs2TkZhdmRkTDQrSTNBTVJ5TldJTkJYCjhHMzRo
dldIbDdTOGhhSFFZN0FXcUZWVTNVL2xKR2pubnF3MEJraEIvVGRCTWIwM0habzkrVjIrWU9RZmk5
QWsKTVRSRXpSeWVObWJqT0sxbHpXdFJXWkZZU0RnMEtqUVh4SkdFNVc5MzFPWitHL1NkbytTM1ZW
SVRPdWxQbHRmVwpXMEdjeCsvZERSNFIxNG5mcUl5L1daMElHUVNXMlRsQytmeGJ0dURDUkFqelRz
b0J3YjJ0cnpoR0VtYVFveUtNCnpBKzVSUHNyQWdNQkFBRUNnZ0VBRUJHaEoyWm5OVHh5YVY5TnZY
QjI1NDNZQnRUMGVSUHBhanJLMXg0bk1OU3oKNE9LNFVzWlo1MnBnTHRHT1EzZm1aS0l0cEo1WlY1
cVBUejdwN3VjUzhnQWNZUnNJUnpCMHA5d3FpWExMK3h0RApxUjB4dnR4VDJpUGlFblVNNndudHpr
SHpKK0g0QkZLT2FvdjNaK3Fha2E1UmFCcmhheGRuaDBDNklLQmZtM3cyCm5zUWI2N0lCYWwrSnBs
L1g5TENWRkdRT2owb0lmVWI5ZFp3OWQ3MCthSGVVb2xvMGdYZmxxcXFFcnl3ZDlPN2QKNnp4dGlx
cnRyZUJhK1IraWs3NE1SK0xvaFNVR3o2VTRQaXhWQ3l1SnQ2U0hvRHR2L3dtSnltWDd2a0FRS2w1
RQplK1JqUGVyakpUWTNzNXNXbEd2V21UTEtEbnVyS2pBYzZUOHhKb0pXWlFLQmdRRHdsd2RRdmww
S28wNHhDUmtiCklYRGVJZE1jZkp2ejRGZEtka1BmVnZVT2xHVEpNZkRzbWNoUzZhcEJCQUdQMUU2
VkN2VzJmUFdjaXhScHE3MW8KR2xtbWZ5RnlJRW0rL08yamMvSFRXWHp6Qjdoc0JISEltQklHczFU
TC9iWFU3amhVQW5kWDdMK3RSRDBKNWRGVwpiN1VOOXNxaWdtRG42REJWZkxaUHgxRnlWUUtCZ1FE
aXBIT1BhNmVMSlk5R1FZdkw3OTIyTHNoU3ZYSUFVMERGCjBabTlqbjM2b3ZIY0kvWEZDdHVXank2
WG9wbk9pbjlycmtUY2FDUnBvSEFNb00ycHdiR0tFY0dVVEY2RHQ3akYKRHVnd2srR21sbDkrbjM2
M3Iwb09YNktSbWFhRStiZHoyNjNQVEhMaktYUnFyc3h5WEtMT3ZyTXhVNWNzMXJCeQpTMWI2ZGhr
M2Z3S0JnRjlONUliMnNkS3ArQ3B5aVRCM0ljZk1yRjBuZTN1ekRjRWdjaWlCd05lQ3J4NElHNEVP
Ck5nMnFKRmhXNXV0NzFaa3kyenpyNlR1VzJJSTNsdk1ySlFKUWNBWk9oZ2dURjJ2ZFhSazA1TXM4
N3JCVFhtTncKNGdzbmROck42UDZ0VTBEc0xTeDJTME91dVdNM1Y2S2U0NkRoZDBuQ3pmSnZ4dDNH
WmszYURnaDFBb0dBWFhIcQpoNDZlZEx1V3VDUGNUTWhvUkc1RGdBSEdHQ1k3UlpTbTY4WHRZVUov
c0FGUG10OWdMRko2cG1DUFE5NU1yUXdjCkxqZnVFM0xuMy8wSTd0NENvbWV4eGNBN0U5blRIOFNH
clVpN3QrQzJITklNQUJZUTFaNU91L042K2Nhd0FkL28KYU5rZllWTzlRU015L2svOWZIcWFEVk5t
dUVFSVhRZDlKQ1UvUG1jQ2dZQWI0RTBRWTdDZmlrV293OFIzSlhoZgo0MHFVVkdud09QKzJNbXE5
d2ZmWkpTRHNFSTQvb2g0VGRnN0sybHNNazVsWnRaMyszTjljSDVUc1pMYlJtd2FMCm9sRVl6K1BB
WU91MlMrY1l2bFlNL0V2WmlpRHJybjZuTStNbTNnaXJPYkNwMzcxd1ZxRFVsUnB4OUlwWVdYcnAK
T3YxUXFHdXkwODdyQkk1cStWL3hqQT09Ci0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS0KLS0tLS1C
RUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUQ3VENDQXRXZ0F3SUJBZ0lVVTNsendsTVNSa294Tkdk
SFJzZllIcUtxcDAwd0RRWUpLb1pJaHZjTkFRRUwKQlFBd2dZVXhDekFKQmdOVkJBWVRBa1JGTVJN
d0VRWURWUVFJREFwVGIyMWxMVk4wWVhSbE1Rd3dDZ1lEVlFRSApEQU5TUlVjeEVUQVBCZ05WQkFv
TUNHUnBjM05sWTNSdk1Rd3dDZ1lEVlFRTERBTkVSVll4RFRBTEJnTlZCQU1NCkJGUkZVMVF4SXpB
aEJna3Foa2lHOXcwQkNRRVdGR052Ym5SaFkzUXRkWE5BWkdsemMyVmpMblJ2TUI0WERUSTAKTURN
eE9ERTVNek13TlZvWERUSTBNRFF4TnpFNU16TXdOVm93Z1lVeEN6QUpCZ05WQkFZVEFrUkZNUk13
RVFZRApWUVFJREFwVGIyMWxMVk4wWVhSbE1Rd3dDZ1lEVlFRSERBTlNSVWN4RVRBUEJnTlZCQW9N
Q0dScGMzTmxZM1J2Ck1Rd3dDZ1lEVlFRTERBTkVSVll4RFRBTEJnTlZCQU1NQkZSRlUxUXhJekFo
QmdrcWhraUc5dzBCQ1FFV0ZHTnYKYm5SaFkzUXRkWE5BWkdsemMyVmpMblJ2TUlJQklqQU5CZ2tx
aGtpRzl3MEJBUUVGQUFPQ0FROEFNSUlCQ2dLQwpBUUVBMVAvaDBKbWJCL2J3RmdyeXdlS28yWUhV
VVNLdWFHd0k2Y1doNFQ1UVV3djBUN05LOVJFTFZnSnByQTNVCk96NGlvZmw1Rnd5c3pITWlDTHBC
WXVmRXdKbTdXbk94UFZXeGliSGpJN3hRU1NRSXdMcHBWa0h6VnF2dWpSV3IKM1hTK1BpTndERWNq
VmlEUVYvQnQrSWIxaDVlMHZJV2gwR093RnFoVlZOMVA1U1JvNTU2c05BWklRZjAzUVRHOQpOeDJh
UGZsZHZtRGtINHZRSkRFMFJNMGNualptNHppdFpjMXJVVm1SV0VnNE5DbzBGOFNSaE9WdmQ5VG1m
aHYwCm5hUGt0MVZTRXpycFQ1YlgxbHRCbk1mdjNRMGVFZGVKMzZpTXYxbWRDQmtFbHRrNVF2bjhX
N2Jnd2tRSTgwN0sKQWNHOXJhODRSaEpta0tNaWpNd1B1VVQ3S3dJREFRQUJvMU13VVRBZEJnTlZI
UTRFRmdRVVZhbUFkUjR1ZW8zQgpmV0RjUlMyUkQ3OEtlZXd3SHdZRFZSMGpCQmd3Rm9BVVZhbUFk
UjR1ZW8zQmZXRGNSUzJSRDc4S2Vld3dEd1lEClZSMFRBUUgvQkFVd0F3RUIvekFOQmdrcWhraUc5
dzBCQVFzRkFBT0NBUUVBRjE1TTNvL3RyUVdYeHdHamlxZjgKNXBUTEM0bHJwQkZaTFZDbStQdHd4
aENlN1ZSd2dLMElBb01EMW0vSjNEYnVJSjVURXlTVElnR2N0WHVNbG5pWgpsY3IwekZOZVVhQ08w
YkdhaExYUXpCWTRxSkhTTUNWNnhiNXNqUDlEdk9HYnFxbHVTbk51ZFJ5UWNIbkd4SE0rCk1adXpO
WUNseklOMEtYbFJuSTZqRXUrcG9XZ0pEMGN1NFM2b1lwT2R3bElRYmtaNnIrUE1jQ3hpRmhRd3E2
em4KcE1nQzB0WlpSM3pCOEpVcTJwRHlGVy9jVlFjWkp5YUhnQkkwWlJWWG5wbDFqYng2YlNIOCts
cnMxVk1xZDlkcQozd1BMcjBheWI2VkpNa29WMjNWSXAzLzlYQVpTR3Z6Y0dadnM2VThSUTdFbUtx
akJibWxudm1CTkpUMk9xbFFRCllRPT0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo="""
certstring = certstring.replace('\n', '')
def _load_certificate_chain(context) -> None:
with tempfile.NamedTemporaryFile(delete=False) as fp:
fp.write(base64.b64decode(certstring))
fp.close()
context.load_cert_chain(fp.name)
server_up = threading.Event()
sniff_up = threading.Event()
def server():
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
_load_certificate_chain(context)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssock = context.wrap_socket(sock)
try:
ssock.bind(('127.0.0.1', 3496))
ssock.listen(1)
server_up.set()
connection, address = ssock.accept()
sniff_up.wait(timeout=1)
connection.send(buffer)
connection.close()
finally:
ssock.close()
server_thread = threading.Thread(target=server)
server_thread.start()
server_up.wait(timeout=1)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = DoIPSocket(activate_routing=False, force_tls=True, context=context)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2
= Test DoIPSslSocket6
~ broken_windows
server_up = threading.Event()
sniff_up = threading.Event()
def server():
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
_load_certificate_chain(context)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssock = context.wrap_socket(sock)
try:
ssock.bind(('::1', 3496))
ssock.listen(1)
server_up.set()
connection, address = ssock.accept()
sniff_up.wait(timeout=1)
connection.send(buffer)
connection.close()
finally:
ssock.close()
server_thread = threading.Thread(target=server)
server_thread.start()
server_up.wait(timeout=1)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2
= Test UDS_DoIPSslSocket6
~ broken_windows
server_up = threading.Event()
sniff_up = threading.Event()
def server():
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
_load_certificate_chain(context)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
buffer = b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssock = context.wrap_socket(sock)
try:
ssock.bind(('::1', 3496))
ssock.listen(1)
server_up.set()
connection, address = ssock.accept()
sniff_up.wait(timeout=1)
connection.send(buffer)
connection.close()
finally:
ssock.close()
server_thread = threading.Thread(target=server)
server_thread.start()
server_up.wait(timeout=1)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = UDS_DoIPSocket(ip="::1", activate_routing=False, force_tls=True, context=context)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_thread.join(timeout=1)
assert len(pkts) == 2
= Test UDS_DualDoIPSslSocket6
~ broken_windows not_pypy
server_tcp_up = threading.Event()
server_tls_up = threading.Event()
sniff_up = threading.Event()
def server_tls():
context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
_load_certificate_chain(context)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
buffer = bytes.fromhex("02fd0006000000090e8011061000000000")
buffer += b'\x02\xfd\x80\x02\x00\x00\x00\x05\x00\x00\x00\x00\x00\x02\xfd\x80\x01\x00\x00\x00\n\x10\x10\x0e\x80P\x03\x002\x01\xf4'
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ssock = context.wrap_socket(sock)
try:
ssock.bind(('::1', 3496))
ssock.listen(1)
server_tls_up.set()
connection, address = ssock.accept()
sniff_up.wait(timeout=1)
connection.send(buffer)
connection.close()
finally:
ssock.close()
def server_tcp():
buffer = bytes.fromhex("02fd0006000000090e8011060700000000")
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
try:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('::1', 13400))
sock.listen(1)
server_tcp_up.set()
connection, address = sock.accept()
connection.send(buffer)
connection.shutdown(socket.SHUT_RDWR)
connection.close()
finally:
sock.close()
server_tcp_thread = threading.Thread(target=server_tcp)
server_tcp_thread.start()
server_tcp_up.wait(timeout=1)
server_tls_thread = threading.Thread(target=server_tls)
server_tls_thread.start()
server_tls_up.wait(timeout=1)
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
sock = UDS_DoIPSocket(ip="::1", context=context)
pkts = sock.sniff(timeout=1, count=2, started_callback=sniff_up.set)
server_tcp_thread.join(timeout=1)
server_tls_thread.join(timeout=1)
assert len(pkts) == 2