blob: 32ab7bc369bb0c8937fd8a157ec243fc892994e4 [file] [log] [blame]
% Regression tests for Simulated ECUs and UDS Scanners
~ scanner
+ Configuration
~ conf
= Imports
import io
import pickle
from scapy.contrib.isotp import ISOTPMessageBuilder
from test.testsocket import TestSocket, cleanup_testsockets, UnstableSocket
from scapy.automaton import ObjectPipe
############
############
+ Load general modules
= Load contribution layer
from scapy.contrib.automotive.uds import *
from scapy.contrib.automotive.uds_ecu_states import *
from scapy.contrib.automotive.uds_scan import *
from scapy.contrib.automotive.ecu import *
load_layer("can")
conf.debug_dissector = False
= Define Testfunction
def executeScannerInVirtualEnvironment(supported_responses, enumerators, unstable_socket=True, **kwargs):
tester_obj_pipe = ObjectPipe(name="TesterPipe")
ecu_obj_pipe = ObjectPipe(name="ECUPipe")
TesterSocket = UnstableSocket if unstable_socket else TestSocket
tester = TesterSocket(UDS, tester_obj_pipe)
ecu = TestSocket(UDS, ecu_obj_pipe)
tester.pair(ecu)
answering_machine = EcuAnsweringMachine(
supported_responses=supported_responses, main_socket=ecu,
basecls=UDS, verbose=False)
def reset():
answering_machine.state.reset()
answering_machine.state["session"] = 1
sniff(timeout=0.001, opened_socket=[ecu, tester])
def reconnect():
try:
tester.close()
except Exception:
pass
tester = TesterSocket(UDS, tester_obj_pipe)
ecu.pair(tester)
return tester
def answering_machine_thread():
answering_machine(
timeout=120, stop_filter=lambda x: bytes(x) == b"\xff\xff\xff")
sim = threading.Thread(target=answering_machine_thread)
try:
sim.start()
scanner = UDS_Scanner(
tester, reset_handler=reset, reconnect_handler=reconnect,
test_cases=enumerators, timeout=0.1,
retry_if_none_received=True, unittest=True,
**kwargs)
for i in range(12):
print("Starting scan")
scanner.scan(timeout=10)
if scanner.scan_completed:
print("Scan completed after %d iterations" % i)
break
finally:
ecu.ins.send(Raw(b"\xff\xff\xff"))
sim.join(timeout=2)
assert not sim.is_alive()
cleanup_testsockets()
tester_obj_pipe.close()
ecu_obj_pipe.close()
if LINUX:
pickle_test(scanner)
return scanner
def pickle_test(scanner):
f = io.BytesIO()
pickle.dump(scanner, f)
unp = pickle.loads(f.getvalue())
assert scanner.scan_completed == unp.scan_completed
assert scanner.state_paths == unp.state_paths
= Load packets from pcap
conf.contribs['CAN']['swap-bytes'] = True
pkts = rdpcap(scapy_path("test/pcaps/candump_uds_scanner.pcap.gz"))
assert len(pkts)
= Create UDS messages from packets
builder = ISOTPMessageBuilder(basecls=UDS, use_ext_address=False, rx_id=[0x641, 0x651])
msgs = list()
for p in pkts:
if p.data == b"ECURESET":
msgs.append(p)
else:
builder.feed(p)
if len(builder):
msgs.append(builder.pop())
assert len(msgs)
= Create ECU-Clone from packets
mEcu = Ecu(logging=False, verbose=False, store_supported_responses=True, lookahead=3)
for p in msgs:
if isinstance(p, CAN) and p.data == b"ECURESET":
mEcu.reset()
else:
mEcu.update(p)
assert len(mEcu.supported_responses)
= Test UDS_SAEnumerator evaluate_response
e = UDS_SAEnumerator()
config = {}
s = EcuState(session=1)
assert False == e._evaluate_response(s, UDS(b"\x27\x01"), None, **config)
config = {"exit_if_service_not_supported": True}
assert not e._retry_pkt[s]
assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x11"), **config)
assert not e._retry_pkt[s]
assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x24"), **config)
assert e._retry_pkt[s] == UDS(b"\x27\x01")
assert False == e._evaluate_response(s, UDS(b"\x27\x02"), UDS(b"\x7f\x27\x24"), **config)
assert not e._retry_pkt[s]
assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x37"), **config)
assert e._retry_pkt[s] == UDS(b"\x27\x01")
assert False == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x37"), **config)
assert not e._retry_pkt[s]
assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x67\x01ab"), **config)
assert not e._retry_pkt[s]
assert False == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x67\x02ab"), **config)
assert not e._retry_pkt[s]
= Test UDS_SA_XOR_Enumerator stand alone mode
TesterSocket = TestSocket
ecu_sock = TestSocket(UDS)
mTester = TesterSocket(UDS)
ecu_sock.pair(mTester)
answering_machine = EcuAnsweringMachine(supported_responses=mEcu.supported_responses, main_socket=ecu_sock, basecls=UDS, verbose=False)
sim = threading.Thread(target=answering_machine, kwargs={'timeout': 1000, "stop_filter": lambda x: bytes(x) == b"\xff\xff\xff"})
sim.start()
try:
resp = mTester.sr1(UDS()/UDS_TP(b"\x00"), verbose=False, timeout=1)
print(repr(resp))
assert resp and resp.service != 0x7f
resp = mTester.sr1(UDS()/UDS_DSC(diagnosticSessionType=3), verbose=False, timeout=1)
print(repr(resp))
assert resp and resp.service != 0x7f
assert UDS_SA_XOR_Enumerator().get_security_access(mTester, 1)
finally:
mTester.send(Raw(b"\xff\xff\xff"))
sim.join(timeout=2)
cleanup_testsockets()
= Test configuration validation
try:
scanner = UDS_Scanner(TestSocket(UDS),
test_cases=[UDS_SA_XOR_Enumerator, UDS_DSCEnumerator, UDS_ServiceEnumerator],
UDS_DSCEnumerator_kwargs={"scan_range": range(0x1000), "delay_state_change": 0,
"overwrite_timeout": False})
assert False
except Scapy_Exception:
pass
= Simulate ECU and run Scanner
scanner = executeScannerInVirtualEnvironment(
mEcu.supported_responses,
[UDS_SA_XOR_Enumerator, UDS_DSCEnumerator, UDS_ServiceEnumerator],
UDS_DSCEnumerator_kwargs={"scan_range": range(5), "delay_state_change": 0,
"overwrite_timeout": False},
UDS_SA_XOR_Enumerator_kwargs={"scan_range": range(5)},
UDS_ServiceEnumerator_kwargs={"scan_range": [0x10, 0x11, 0x14, 0x19, 0x22,
0x23, 0x24, 0x27, 0x28, 0x29,
0x2A, 0x2C, 0x2E, 0x2F, 0x31,
0x34, 0x35, 0x36, 0x37, 0x38,
0x3D, 0x3E, 0x83, 0x84, 0x85,
0x87],
"request_length": 1})
scanner.show_testcases()
scanner.show_testcases_status()
assert len(scanner.state_paths) == 5
assert scanner.scan_completed
assert scanner.progress() > 0.95
assert EcuState(session=1) in scanner.final_states
assert EcuState(session=2, tp=1) in scanner.final_states
assert EcuState(session=3, tp=1) in scanner.final_states
assert EcuState(session=2, tp=1, security_level=2) in scanner.final_states
assert EcuState(session=3, tp=1, security_level=2) in scanner.final_states
#################### UDS_SA_XOR_Enumerator ################
tc = scanner.configuration.test_cases[0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 19
assert len(tc.results_with_positive_response) >= 6
assert len(tc.scanned_states) == 5
result = tc.show(dump=True)
assert "serviceNotSupportedInActiveSession received 5 times" in result
assert "incorrectMessageLengthOrInvalidFormat received 14 times" in result
################# UDS_DSCEnumerator #####################
tc = scanner.configuration.test_cases[1]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 20
assert len(tc.results_with_positive_response) == 5
assert len(tc.scanned_states) == 5
result = tc.show(dump=True)
assert "incorrectMessageLengthOrInvalidFormat received 20 times" in result
###################### UDS_ServiceEnumerator ###################
tc = scanner.configuration.test_cases[2]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 130
assert len(tc.results_with_positive_response) == 0
assert len(tc.scanned_states) == 5
result = tc.show(dump=True)
assert "incorrectMessageLengthOrInvalidFormat received 34 times" in result
assert "serviceNotSupported received 75 times" in result
assert "serviceNotSupportedInActiveSession received 19 times" in result
assert "securityAccessDenied received 2 times" in result
= UDS_ServiceEnumerator
def req_handler(resp, req):
if req.service != 0x22:
return False
if len(req) == 1:
resp.negativeResponseCode="generalReject"
return True
if len(req) == 2:
resp.negativeResponseCode="incorrectMessageLengthOrInvalidFormat"
return True
if len(req) == 3:
resp.negativeResponseCode="requestOutOfRange"
return True
return False
resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")], req_handler)]
es = [UDS_ServiceEnumerator]
debug_dissector_backup = conf.debug_dissector
# This Enumerator is sending corrupted Packets, therefore we need to disable the debug_dissector
conf.debug_dissector = False
scanner = executeScannerInVirtualEnvironment(
resps, es, UDS_ServiceEnumerator_kwargs={"request_length": 3}, unstable_socket=False)
conf.debug_dissector = debug_dissector_backup
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.test_cases[0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
tc.show()
assert len(tc.results_with_negative_response) == 128 * 3
assert len(tc.results_with_positive_response) == 0
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "incorrectMessageLengthOrInvalidFormat" in result
assert "requestOutOfRange" in result
= UDS_RDBIEnumerator
resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])]
es = [UDS_RDBIEnumerator]
scanner = executeScannerInVirtualEnvironment(
resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)})
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.test_cases[0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 256 - 4
assert len(tc.results_with_positive_response) == 4
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "asdfbeef1" in result
assert "beef2" in result
assert "beef3" in result
assert "beefff" in result
assert "subFunctionNotSupported received" in result
ids = [t.req.identifiers[0] for t in tc.results_with_positive_response]
assert 1 in ids
assert 2 in ids
assert 3 in ids
assert 0xff in ids
= UDS_RDBISelectiveEnumerator
~ not_pypy
resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x101)/Raw(b"asdfbeef1")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x102)/Raw(b"beef2")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x103)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x104)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x105)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x106)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x107)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x108)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x109)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x110)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x111)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x112)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x113)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x114)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x115)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x116)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x117)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x118)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x119)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x120)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x121)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x122)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x123)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x124)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x125)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x126)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x127)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x128)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x129)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x130)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x131)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x132)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x133)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x134)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x135)/Raw(b"beef35")]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])]
es = [UDS_RDBISelectiveEnumerator]
scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RDBIRandomEnumerator_kwargs={"probe_start": 0, "probe_end": 0x500})
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.stages[0][0]
tc.show()
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) > 100
assert len(tc.results_with_positive_response) >= 1
assert len(tc.scanned_states) == 1
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.stages[0][1]
tc.show()
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 29
assert len(tc.results_with_positive_response) == 35
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "asdfbeef1" in result
assert "beef2" in result
assert "beef3" in result
assert "beef35" in result
assert "subFunctionNotSupported received" in result
ids = [t.req.identifiers[0] for t in tc.results_with_positive_response]
assert 0x101 in ids
assert 0x102 in ids
assert 0x103 in ids
assert 0x135 in ids
= UDS_WDBIEnumerator
def wdbi_handler(resp, req):
if req.service != 0x2E:
return False
assert req.dataIdentifier in [1, 2, 3, 0xff]
resp.dataIdentifier = req.dataIdentifier
if req.dataIdentifier == 1:
assert req.load == b'asdfbeef1'
return True
if req.dataIdentifier == 2:
assert req.load == b'beef2'
return True
if req.dataIdentifier == 3:
assert req.load == b"beef3"
return True
if req.dataIdentifier == 0xff:
assert req.load == b"beefff"
return True
return False
resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]),
EcuResponse(None, [UDS()/UDS_WDBIPR()], answers=wdbi_handler),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])]
es = [UDS_WDBISelectiveEnumerator()]
scanner = executeScannerInVirtualEnvironment(
resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)})
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.stages[0][0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 256 - 4
assert len(tc.results_with_positive_response) == 4
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "asdfbeef1" in result
assert "beef2" in result
assert "beef3" in result
assert "beefff" in result
assert "subFunctionNotSupported received" in result
ids = [t.req.identifiers[0] for t in tc.results_with_positive_response]
assert 1 in ids
assert 2 in ids
assert 3 in ids
assert 0xff in ids
######################### WDBI #############################
tc = scanner.configuration.stages[0][1]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 0
assert len(tc.results_with_positive_response) == 4
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
ids = [t.req.dataIdentifier for t in tc.results_with_positive_response]
assert 1 in ids
assert 2 in ids
assert 3 in ids
assert 0xff in ids
= UDS_WDBIEnumerator 2
def wdbi_handler(resp, req):
if req.service != 0x2E:
return False
assert req.dataIdentifier in [1, 2, 3, 0xff]
resp.dataIdentifier = req.dataIdentifier
if req.dataIdentifier == 1:
assert req.load == b'asdfbeef1'
return True
if req.dataIdentifier == 2:
assert req.load == b'beef2'
return True
if req.dataIdentifier == 3:
assert req.load == b"beef3"
return True
if req.dataIdentifier == 0xff:
assert req.load == b"beefff"
return True
return False
resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]),
EcuResponse(None, [UDS()/UDS_WDBIPR()], answers=wdbi_handler),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])]
es = [UDS_WDBISelectiveEnumerator]
scanner = executeScannerInVirtualEnvironment(
resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)})
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.test_cases[0][0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 256 - 4
assert len(tc.results_with_positive_response) == 4
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "asdfbeef1" in result
assert "beef2" in result
assert "beef3" in result
assert "beefff" in result
assert "subFunctionNotSupported received" in result
ids = [t.req.identifiers[0] for t in tc.results_with_positive_response]
assert 1 in ids
assert 2 in ids
assert 3 in ids
assert 0xff in ids
######################### WDBI #############################
tc = scanner.configuration.test_cases[0][1]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 0
assert len(tc.results_with_positive_response) == 4
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
ids = [t.req.dataIdentifier for t in tc.results_with_positive_response]
assert 1 in ids
assert 2 in ids
assert 3 in ids
assert 0xff in ids
= UDS_TPEnumerator
resps = [EcuResponse(None, [UDS()/UDS_TPPR()]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="serviceNotSupported", requestServiceId="TesterPresent")])]
es = [UDS_TPEnumerator]
scanner = executeScannerInVirtualEnvironment(resps, es)
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.test_cases[0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 0
assert len(tc.results_with_positive_response) == 2
assert len(tc.scanned_states) == 2
assert tc.show(dump=True)
= UDS_EREnumerator
resps = [EcuResponse(None, [UDS()/UDS_ERPR(resetType=1)]),
EcuResponse(None, [UDS()/UDS_ERPR(resetType=3)]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ECUReset")])]
es = [UDS_EREnumerator]
scanner = executeScannerInVirtualEnvironment(resps, es)
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.test_cases[0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 256 - 2
assert len(tc.results_with_positive_response) == 2
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "hardReset" in result
assert "softReset" in result
assert "subFunctionNotSupported received" in result
ids = [t.req.resetType for t in tc.results_with_positive_response]
assert 1 in ids
assert 3 in ids
= UDS_CCEnumerator
resps = [EcuResponse(None, [UDS()/UDS_CCPR(controlType=1)]),
EcuResponse(None, [UDS()/UDS_CCPR(controlType=3)]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="CommunicationControl")])]
es = [UDS_CCEnumerator]
scanner = executeScannerInVirtualEnvironment(resps, es, inter=0.001)
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.test_cases[0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 256 - 2
assert len(tc.results_with_positive_response) == 2
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "enableRxAndDisableTx" in result
assert "disableRxAndTx" in result
assert "subFunctionNotSupported received" in result
ids = [t.req.controlType for t in tc.results_with_positive_response]
assert 1 in ids
assert 3 in ids
= UDS_RDBPIEnumerator
UDS_RDBPI.periodicDataIdentifiers[1] = "identifierElectric"
UDS_RDBPI.periodicDataIdentifiers[3] = "identifierGas"
resps = [EcuResponse(None, [UDS()/UDS_RDBPIPR(periodicDataIdentifier=1, dataRecord=b'electric')]),
EcuResponse(None, [UDS()/UDS_RDBPIPR(periodicDataIdentifier=3, dataRecord=b'gas')]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataPeriodicIdentifier")])]
es = [UDS_RDBPIEnumerator]
scanner = executeScannerInVirtualEnvironment(resps, es)
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.test_cases[0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 256 - 2
assert len(tc.results_with_positive_response) == 2
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "electric" in result
assert "gas" in result
assert "0x01 identifierElectric" in result
assert "0x03 identifierGas" in result
assert "subFunctionNotSupported received" in result
ids = [t.req.periodicDataIdentifier for t in tc.results_with_positive_response]
assert 1 in ids
assert 3 in ids
= UDS_RCEnumerator
resps = [EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=1)/Raw(b"asdfbeef1")]),
EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=2, routineIdentifier=2)/Raw(b"beef2")]),
EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=3, routineIdentifier=3)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=0x10)/Raw(b"beefff")]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="RoutineControl")])]
es = [UDS_RCEnumerator]
scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RCEnumerator_kwargs={"scan_range": range(0x11)})
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.test_cases[0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 0x11 * 3 - 4
assert len(tc.results_with_positive_response) == 4
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "subFunctionNotSupported received" in result
ids = [t.req.routineIdentifier for t in tc.results_with_positive_response]
assert 1 in ids
assert 2 in ids
assert 3 in ids
assert 0x10 in ids
= UDS_RCSelectiveEnumerator
resps = [EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=1)/Raw(b"asdfbeef1")]),
EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=2, routineIdentifier=1)/Raw(b"beef2")]),
EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=3, routineIdentifier=1)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=0x10)/Raw(b"beefff")]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="RoutineControl")])]
es = [UDS_RCSelectiveEnumerator]
scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RCStartEnumerator_kwargs={"scan_range": range(0x11)})
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.stages[0][0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 0x11 - 2
assert len(tc.results_with_positive_response) == 2
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "subFunctionNotSupported received" in result
ids = [t.req.routineIdentifier for t in tc.results_with_positive_response]
assert 1 in ids
assert 0x10 in ids
tc = scanner.configuration.stages[0][1]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 538
assert len(tc.results_with_positive_response) == 2
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "subFunctionNotSupported received" in result
ids = [t.req.routineIdentifier for t in tc.results_with_positive_response]
assert 1 in ids
= UDS_IOCBIEnumerator
resps = [EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]),
EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=2)/Raw(b"beef2")]),
EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=3)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=0xff)/Raw(b"beefff")]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="InputOutputControlByIdentifier")])]
es = [UDS_IOCBIEnumerator]
scanner = executeScannerInVirtualEnvironment(resps, es, UDS_IOCBIEnumerator_kwargs={"scan_range": range(0x100)})
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.test_cases[0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 0x100 - 4
assert len(tc.results_with_positive_response) == 4
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "asdfbeef1" in result
assert "beef2" in result
assert "beef3" in result
assert "beefff" in result
assert "subFunctionNotSupported received" in result
ids = [t.req.dataIdentifier for t in tc.results_with_positive_response]
assert 1 in ids
assert 2 in ids
assert 3 in ids
assert 0xff in ids
= UDS_RDEnumerator
memory = dict()
for addr in itertools.chain(range(0x1f00), range(0xd000, 0xfff2), range(0xa000, 0xcf00), range(0x2000, 0x5f00)):
memory[addr] = addr & 0xff
def answers_rd(resp, req):
global memory
if req.service != 0x34:
return False
if req.memorySizeLen in [1, 3, 4]:
return False
if req.memoryAddressLen in [1, 3, 4]:
return False
addr = getattr(req, "memoryAddress%d" % req.memoryAddressLen)
if addr not in memory.keys():
return False
resp.memorySizeLen = req.memorySizeLen
return True
resps = [EcuResponse(None, [UDS()/UDS_RDPR()], answers=answers_rd),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="requestOutOfRange", requestServiceId="RequestDownload")])]
#######################################################
scanner = executeScannerInVirtualEnvironment(
resps, [UDS_RDEnumerator], unstable_socket=False,
UDS_RDEnumerator_kwargs={"unittest": True})
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc1 = scanner.configuration.test_cases[0]
assert len(tc1.results_without_response) < 10
if len(tc1.results_without_response):
tc1.show()
assert len(tc1.results_with_negative_response) > 400
assert len(tc1.results_with_positive_response) > 40
assert len(tc1.scanned_states) == 1
result = tc1.show(dump=True)
assert "requestOutOfRange received " in result
= UDS_RMBARandomEnumerator
pkt = UDS_RMBARandomEnumerator._random_memory_addr_pkt(4, 4, 10)
assert pkt.memorySizeLen == 4
assert pkt.memoryAddressLen == 4
assert pkt.memorySize4 == 10
assert pkt.memoryAddress4 is not None
pkt = UDS_RMBARandomEnumerator._random_memory_addr_pkt()
assert pkt.memorySizeLen in [1, 2, 3, 4]
assert pkt.memoryAddressLen in [1, 2, 3, 4]
pkt2 = UDS_RMBARandomEnumerator._random_memory_addr_pkt()
assert pkt != pkt2
= UDS_RMBAEnumerator
~ linux not_pypy
memory = dict()
mem_areas = [(0x100, 0x1f00), (0xd000, 0xff00), (0xa000, 0xc000), (0x3000, 0x5f00)]
mem_ranges = [range(s, e) for s, e in mem_areas]
mem_inner_borders = [s for s, _ in mem_areas]
mem_inner_borders += [e - 1 for _, e in mem_areas]
mem_outer_borders = [s - 1 for s, _ in mem_areas]
mem_outer_borders += [e for _, e in mem_areas]
mem_random_test_points = []
for _ in range(100):
mem_random_test_points += [random.choice(list(itertools.chain(*mem_ranges)))]
for addr in itertools.chain(*mem_ranges):
memory[addr] = addr & 0xff
def answers_rmba(resp, req):
global memory
if req.service != 0x23:
return False
if req.memorySizeLen in [1, 3, 4]:
return False
if req.memoryAddressLen in [1, 3, 4]:
return False
addr = getattr(req, "memoryAddress%d" % req.memoryAddressLen)
if addr not in memory.keys():
return False
out_mem = list()
size = getattr(req, "memorySize%d" % req.memorySizeLen)
for i in range(addr, addr + size):
try:
out_mem.append(memory[i])
except KeyError:
pass
resp.dataRecord = bytes(out_mem)
return True
resps = [EcuResponse(None, [UDS()/UDS_RMBAPR(dataRecord=b'')], answers=answers_rmba),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="requestOutOfRange", requestServiceId="ReadMemoryByAddress")])]
#######################################################
scanner = executeScannerInVirtualEnvironment(
resps, [UDS_RMBAEnumerator], unstable_socket=False,
UDS_RMBARandomEnumerator_kwargs={"unittest": True})
assert scanner.scan_completed
tc1 = scanner.configuration.stages[0][1]
assert len(tc1.results_without_response) < 30
if len(tc1.results_without_response):
tc1.show()
assert len(tc1.results_with_negative_response) > 10
assert len(tc1.results_with_positive_response) > 300
assert len(tc1.scanned_states) == 1
result = tc1.show(dump=True)
assert "requestOutOfRange received " in result
############################################################
addrs = tc1._get_memory_addresses_from_results(tc1.results_with_positive_response)
print(float([tp in addrs for tp in mem_inner_borders].count(True)) / len(mem_inner_borders))
assert float([tp in addrs for tp in mem_inner_borders].count(True)) / len(mem_inner_borders) > 0.8
print(float([tp in addrs for tp in mem_random_test_points].count(True)) / len(mem_random_test_points))
assert float([tp in addrs for tp in mem_random_test_points].count(True)) / len(mem_random_test_points) > 0.8
print(float([tp not in addrs for tp in mem_outer_borders].count(True)) / len(mem_outer_borders))
assert float([tp not in addrs for tp in mem_outer_borders].count(True)) / len(mem_outer_borders) > 0.7
= UDS_TDEnumerator
resps = [EcuResponse(None, [UDS()/UDS_TDPR(blockSequenceCounter=1)]),
EcuResponse(None, [UDS()/UDS_TDPR(blockSequenceCounter=3)]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="TransferData")])]
es = [UDS_TDEnumerator]
scanner = executeScannerInVirtualEnvironment(resps, es)
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.test_cases[0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 256 - 2
assert len(tc.results_with_positive_response) == 2
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "subFunctionNotSupported received" in result
ids = [t.req.blockSequenceCounter for t in tc.results_with_positive_response]
assert 1 in ids
assert 3 in ids
= BMW_DevJobEnumerator
load_contrib("automotive.bmw.definitions")
load_contrib("automotive.bmw.enumerator")
resps = [EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff00)/Raw(b"asdfbeef1")]),
EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff02)/Raw(b"beef2")]),
EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff03)/Raw(b"beef3")]),
EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xffff)/Raw(b"beefff")]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="DevelopmentJob")])]
es = [BMW_DevJobEnumerator]
scanner = executeScannerInVirtualEnvironment(resps, es, BMW_DevJobEnumerator_kwargs={"scan_range": range(0xFF00, 0x10000)})
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.test_cases[0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
assert len(tc.results_with_negative_response) == 0x100 - 4
assert len(tc.results_with_positive_response) == 4
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "ReadTransportMessageStatus" in result
assert "65282" in result
assert "65283" in result
assert "ReadMemory" in result
assert "subFunctionNotSupported received" in result
assert "PR: Supported" in result
ids = [t.req.identifier for t in tc.results_with_positive_response]
assert 0xff00 in ids
assert 0xff02 in ids
assert 0xff03 in ids
assert 0xffff in ids
= UDS_ServiceEnumerator weird issue
resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode=0x13, requestServiceId=0x40)]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x41)]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x11)]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x42)]),
EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x43)])]
es = [UDS_ServiceEnumerator]
scanner = executeScannerInVirtualEnvironment(
resps, es, UDS_ServiceEnumerator_kwargs={"scan_range": [0x11, 0x40, 0x41, 0x42]})
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.test_cases[0]
tc.show()
assert len(tc.results_with_negative_response) == 4
= UDS_ServiceEnumerator, all range
def req_handler(resp, req):
if req.service != 0x22:
return False
if len(req) == 1:
resp.negativeResponseCode="generalReject"
return True
if len(req) == 2:
resp.negativeResponseCode="incorrectMessageLengthOrInvalidFormat"
return True
if len(req) == 3:
resp.negativeResponseCode="requestOutOfRange"
return True
return False
resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")], req_handler)]
es = [UDS_ServiceEnumerator]
debug_dissector_backup = conf.debug_dissector
# This Enumerator is sending corrupted Packets, therefore we need to disable the debug_dissector
conf.debug_dissector = False
scanner = executeScannerInVirtualEnvironment(
resps, es, UDS_ServiceEnumerator_kwargs={"request_length": 3, "scan_range": range(256)}, unstable_socket=False)
conf.debug_dissector = debug_dissector_backup
assert scanner.scan_completed
assert scanner.progress() > 0.95
tc = scanner.configuration.test_cases[0]
assert len(tc.results_without_response) < 10
if tc.results_without_response:
tc.show()
tc.show()
assert len(tc.scanned_states) == 1
result = tc.show(dump=True)
assert "incorrectMessageLengthOrInvalidFormat" in result
assert "requestOutOfRange" in result
+ Cleanup
= Delete testsockets
cleanup_testsockets()