blob: d2eea1e3aeb79b7571f9328be8a492eb54e264d8 [file] [log] [blame]
% Regression tests for EcuAnsweringMachine
+ Configuration
~ conf
= Imports
from test.testsocket import TestSocket, cleanup_testsockets
############
############
+ Load general modules
= Load contribution layer
load_contrib("automotive.uds", globals_dict=globals())
load_contrib("automotive.ecu", globals_dict=globals())
load_contrib("automotive.uds_ecu_states", globals_dict=globals())
ecu = TestSocket(UDS)
tester = TestSocket(UDS)
ecu.pair(tester)
+ Simulator tests
= Simple check with RDBI and Negative Response
example_responses = \
[EcuResponse([EcuState(session=1)], responses=UDS() / UDS_RDBIPR(dataIdentifier=0x1234) / Raw(b"deadbeef"))]
success = False
answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS, verbose=False)
sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff})
sim.start()
try:
resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[0x123]), timeout=1, verbose=False)
assert resp.negativeResponseCode == 0x10
assert resp.requestServiceId == 34
resp = tester.sr1(UDS(service=0x22), timeout=1, verbose=False)
assert resp.negativeResponseCode == 0x10
assert resp.requestServiceId == 34
resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[0x1234]), timeout=1, verbose=False)
assert resp.service == 0x62
assert resp.dataIdentifier == 0x1234
assert resp.load == b"deadbeef"
success = True
except Exception as ex:
print(ex)
finally:
tester.send(UDS(service=0xff))
sim.join(timeout=10)
assert success
= Simple check with different Sessions
example_responses = \
[EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")),
EcuResponse(EcuState(session=[3, 4]), responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")),
EcuResponse(EcuState(session=[5, 6, 7]), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")),
EcuResponse(EcuState(session=[8, 9]), responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4"))]
success = False
answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS)
sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff})
sim.start()
try:
resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.negativeResponseCode == 0x10
assert resp.requestServiceId == 34
answering_machine.state.session = 2
resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.service == 0x62
assert resp.dataIdentifier == 2
assert resp.load == b"deadbeef1"
answering_machine.state.session = 4
resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.service == 0x62
assert resp.dataIdentifier == 3
assert resp.load == b"deadbeef2"
answering_machine.state.session = 6
resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.service == 0x62
assert resp.dataIdentifier == 5
assert resp.load == b"deadbeef3"
answering_machine.state.session = 9
resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.service == 0x62
assert resp.dataIdentifier == 9
assert resp.load == b"deadbeef4"
success = True
except Exception as ex:
print(ex)
finally:
tester.send(UDS(service=0xff))
sim.join(timeout=10)
assert success
= Simple check with different Sessions and diagnosticSessionControl
example_responses = \
[EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")),
EcuResponse(EcuState(session=range(3,5)), responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")),
EcuResponse(EcuState(session=[5,6,7]), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")),
EcuResponse(EcuState(session=9), responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4")),
EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=1, sessionParameterRecord=b"dead")),
EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=2, sessionParameterRecord=b"dead")),
EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b"dead")),
EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=4, sessionParameterRecord=b"dead")),
EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=5, sessionParameterRecord=b"dead")),
EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=6, sessionParameterRecord=b"dead")),
EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=7, sessionParameterRecord=b"dead")),
EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=8, sessionParameterRecord=b"dead")),
EcuResponse([EcuState(), EcuState(session=range(8,10))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead1")),
EcuResponse([EcuState(), EcuState(session=range(8,10))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead2")),
EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))]
success = False
answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS)
sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff})
sim.start()
try:
resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.negativeResponseCode == 0x10
assert resp.requestServiceId == 34
resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=2), timeout=1, verbose=False)
assert resp.service == 0x50
assert resp.diagnosticSessionType == 2
assert resp.sessionParameterRecord == b"dead"
resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.service == 0x62
assert resp.dataIdentifier == 2
assert resp.load == b"deadbeef1"
resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=4), timeout=1, verbose=False)
assert resp.service == 0x50
assert resp.diagnosticSessionType == 4
assert resp.sessionParameterRecord == b"dead"
resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.service == 0x62
assert resp.dataIdentifier == 3
assert resp.load == b"deadbeef2"
resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=6), timeout=1, verbose=False)
assert resp.service == 0x50
assert resp.diagnosticSessionType == 6
assert resp.sessionParameterRecord == b"dead"
resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.service == 0x62
assert resp.dataIdentifier == 5
assert resp.load == b"deadbeef3"
resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=8), timeout=1, verbose=False)
assert resp.service == 0x50
assert resp.diagnosticSessionType == 8
assert resp.sessionParameterRecord == b"dead"
resp = tester.sr1(UDS() / UDS_DSC(diagnosticSessionType=9), timeout=1, verbose=False)
assert resp.service == 0x50
assert resp.diagnosticSessionType == 9
assert resp.sessionParameterRecord == b"dead1"
resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.service == 0x62
assert resp.dataIdentifier == 9
assert resp.load == b"deadbeef4"
success = True
except Exception as ex:
print(ex)
finally:
tester.send(UDS(service=0xff))
sim.join(timeout=10)
assert success
= Simple check with different Sessions and diagnosticSessionControl and answers hook
def custom_answers(resp, req):
if req.service + 0x40 != resp.service:
return False
if hasattr(req, "diagnosticSessionType"):
if 0 < req.diagnosticSessionType <= 8:
resp.diagnosticSessionType = req.diagnosticSessionType
return resp.answers(req)
return False
example_responses = \
[EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")),
EcuResponse(EcuState(session=range(3,5)), responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")),
EcuResponse(EcuState(session=[5,6,7]), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")),
EcuResponse(EcuState(session=[9, 10]), responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4")),
EcuResponse(EcuState(session=range(0,8)), responses=UDS() / UDS_DSCPR(diagnosticSessionType=1, sessionParameterRecord=b"dead"), answers=custom_answers),
EcuResponse(EcuState(session=range(8,10)), responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead1")),
EcuResponse(EcuState(session=range(8,10)), responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead2")),
EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))]
success = False
answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS)
sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff})
sim.start()
try:
resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.negativeResponseCode == 0x10
assert resp.requestServiceId == 34
resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=2), timeout=1, verbose=False)
assert resp.service == 0x50
assert resp.diagnosticSessionType == 2
assert resp.sessionParameterRecord == b"dead"
resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.service == 0x62
assert resp.dataIdentifier == 2
assert resp.load == b"deadbeef1"
resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=4), timeout=1, verbose=False)
assert resp.service == 0x50
assert resp.diagnosticSessionType == 4
assert resp.sessionParameterRecord == b"dead"
resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.service == 0x62
assert resp.dataIdentifier == 3
assert resp.load == b"deadbeef2"
resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=6), timeout=1, verbose=False)
assert resp.service == 0x50
assert resp.diagnosticSessionType == 6
assert resp.sessionParameterRecord == b"dead"
resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.service == 0x62
assert resp.dataIdentifier == 5
assert resp.load == b"deadbeef3"
resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=8), timeout=1, verbose=False)
assert resp.service == 0x50
assert resp.diagnosticSessionType == 8
assert resp.sessionParameterRecord == b"dead"
resp = tester.sr1(UDS() / UDS_DSC(diagnosticSessionType=9), timeout=1, verbose=False)
assert resp.service == 0x50
assert resp.diagnosticSessionType == 9
assert resp.sessionParameterRecord == b"dead1"
resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False)
assert resp.service == 0x62
assert resp.dataIdentifier == 9
assert resp.load == b"deadbeef4"
success = True
except Exception as ex:
print(ex)
finally:
tester.send(UDS(service=0xff))
sim.join(timeout=10)
assert success
= Simple check with security access and answers hook
security_seed = b"abcd"
def custom_answers(resp, req):
global security_seed
if req.service + 0x40 != resp.service or req.service != 0x27:
return False
if req.securityAccessType == 1:
resp.securitySeed = security_seed
return resp.answers(req)
elif req.securityAccessType == 2:
return resp.answers(req) and req.securityKey == security_seed + security_seed
return False
example_responses = \
[EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234"), answers=custom_answers),
EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers),
EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)),
EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))]
success = False
answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS)
sim = threading.Thread(target=answering_machine, kwargs={'timeout': 10, 'stop_filter': lambda p: p.service==0xff})
sim.start()
try:
resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False)
assert resp.service == 0x67
assert resp.securitySeed == b"abcd"
resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=1, verbose=False)
assert resp.service == 0x7f
assert resp.negativeResponseCode == 0x35
resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False)
assert resp.service == 0x67
assert resp.securitySeed == b"abcd"
resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=1, verbose=False)
assert resp.service == 0x67
success = True
except Exception as ex:
print(ex)
finally:
tester.send(UDS(service=0xff))
sim.join(timeout=10)
assert success
= Simple check with security access and answers hook and request-correctly-received message
security_seed = b"abcd"
def custom_answers(resp, req):
global security_seed
if req.service + 0x40 != resp.service or req.service != 0x27:
return False
if req.securityAccessType == 1:
resp.securitySeed = security_seed
return resp.answers(req)
elif req.securityAccessType == 2:
return resp.answers(req) and req.securityKey == security_seed + security_seed
return False
example_responses = \
[EcuResponse(EcuState(session=range(0,255)), responses=[UDS()/UDS_NR(negativeResponseCode=0x78, requestServiceId=0x27), UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234")], answers=custom_answers),
EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers),
EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)),
EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))]
success = False
answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS)
sim = threading.Thread(target=answering_machine, kwargs={'timeout': 10, 'stop_filter': lambda p: p.service==0xff})
sim.start()
try:
resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=2, verbose=False)
assert resp.service == 0x67
assert resp.securitySeed == b"abcd"
resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=2, verbose=False)
assert resp.service == 0x7f
assert resp.negativeResponseCode == 0x35
resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=2, verbose=False)
assert resp.service == 0x67
assert resp.securitySeed == b"abcd"
resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=2, verbose=False)
assert resp.service == 0x67
success = True
except Exception as ex:
print(ex)
finally:
tester.send(UDS(service=0xff))
sim.join(timeout=10)
assert success
= Simple check with security access and answers hook and request-correctly-received message 2
security_seed = b"abcd"
def custom_answers(resp, req):
global security_seed
if req.service + 0x40 != resp.service or req.service != 0x27:
return False
if req.securityAccessType == 1:
resp.securitySeed = security_seed
return resp.answers(req)
elif req.securityAccessType == 2:
return resp.answers(req) and req.securityKey == security_seed + security_seed
return False
example_responses = \
[EcuResponse(EcuState(session=range(0,255)), responses=[UDS()/UDS_NR(negativeResponseCode=0x78, requestServiceId=0x27), UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234")], answers=custom_answers),
EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers),
EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)),
EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))]
conf.contribs['UDS']['treat-response-pending-as-answer'] = True
success = False
answering_machine = EcuAnsweringMachine(supported_responses=example_responses,
main_socket=ecu, basecls=UDS)
sim = threading.Thread(target=answering_machine, kwargs={'timeout':5, 'stop_filter': lambda p: p.service==0xff})
sim.start()
try:
resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False)
assert resp.service == 0x7f
assert resp.negativeResponseCode == 0x78
resp = tester.sniff(timeout=2, count=1, verbose=False)[0]
assert resp.service == 0x67
assert resp.securitySeed == b"abcd"
resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=3, verbose=False)
assert resp.service == 0x7f
assert resp.negativeResponseCode == 0x35
resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False)
assert resp.service == 0x7f
assert resp.negativeResponseCode == 0x78
resp = tester.sniff(timeout=2, count=1, verbose=False)[0]
assert resp.service == 0x67
assert resp.securitySeed == b"abcd"
resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=1, verbose=False)
assert resp.service == 0x67
success = True
except Exception as ex:
print(ex)
finally:
tester.send(UDS(service=0xff))
sim.join(timeout=10)
assert success
conf.contribs['UDS']['treat-response-pending-as-answer'] = False
+ Cleanup
= Delete TestSockets
cleanup_testsockets()