| % Regression tests for EcuAnsweringMachine |
| |
| + Configuration |
| ~ conf |
| |
| = Imports |
| |
| from test.testsocket import TestSocket, cleanup_testsockets |
| |
| ############ |
| ############ |
| + Load general modules |
| |
| = Load contribution layer |
| |
| load_contrib("automotive.uds", globals_dict=globals()) |
| load_contrib("automotive.ecu", globals_dict=globals()) |
| load_contrib("automotive.uds_ecu_states", globals_dict=globals()) |
| |
| ecu = TestSocket(UDS) |
| tester = TestSocket(UDS) |
| ecu.pair(tester) |
| |
| + Simulator tests |
| |
| = Simple check with RDBI and Negative Response |
| |
| example_responses = \ |
| [EcuResponse([EcuState(session=1)], responses=UDS() / UDS_RDBIPR(dataIdentifier=0x1234) / Raw(b"deadbeef"))] |
| |
| success = False |
| answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS, verbose=False) |
| sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff}) |
| sim.start() |
| try: |
| resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[0x123]), timeout=1, verbose=False) |
| assert resp.negativeResponseCode == 0x10 |
| assert resp.requestServiceId == 34 |
| resp = tester.sr1(UDS(service=0x22), timeout=1, verbose=False) |
| assert resp.negativeResponseCode == 0x10 |
| assert resp.requestServiceId == 34 |
| resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[0x1234]), timeout=1, verbose=False) |
| assert resp.service == 0x62 |
| assert resp.dataIdentifier == 0x1234 |
| assert resp.load == b"deadbeef" |
| success = True |
| except Exception as ex: |
| print(ex) |
| finally: |
| tester.send(UDS(service=0xff)) |
| sim.join(timeout=10) |
| |
| assert success |
| |
| = Simple check with different Sessions |
| |
| example_responses = \ |
| [EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")), |
| EcuResponse(EcuState(session=[3, 4]), responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")), |
| EcuResponse(EcuState(session=[5, 6, 7]), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")), |
| EcuResponse(EcuState(session=[8, 9]), responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4"))] |
| |
| success = False |
| |
| answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) |
| sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff}) |
| sim.start() |
| try: |
| resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.negativeResponseCode == 0x10 |
| assert resp.requestServiceId == 34 |
| answering_machine.state.session = 2 |
| resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.service == 0x62 |
| assert resp.dataIdentifier == 2 |
| assert resp.load == b"deadbeef1" |
| answering_machine.state.session = 4 |
| resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.service == 0x62 |
| assert resp.dataIdentifier == 3 |
| assert resp.load == b"deadbeef2" |
| answering_machine.state.session = 6 |
| resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.service == 0x62 |
| assert resp.dataIdentifier == 5 |
| assert resp.load == b"deadbeef3" |
| answering_machine.state.session = 9 |
| resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.service == 0x62 |
| assert resp.dataIdentifier == 9 |
| assert resp.load == b"deadbeef4" |
| success = True |
| except Exception as ex: |
| print(ex) |
| finally: |
| tester.send(UDS(service=0xff)) |
| sim.join(timeout=10) |
| |
| assert success |
| |
| = Simple check with different Sessions and diagnosticSessionControl |
| |
| example_responses = \ |
| [EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")), |
| EcuResponse(EcuState(session=range(3,5)), responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")), |
| EcuResponse(EcuState(session=[5,6,7]), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")), |
| EcuResponse(EcuState(session=9), responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4")), |
| EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=1, sessionParameterRecord=b"dead")), |
| EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=2, sessionParameterRecord=b"dead")), |
| EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=3, sessionParameterRecord=b"dead")), |
| EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=4, sessionParameterRecord=b"dead")), |
| EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=5, sessionParameterRecord=b"dead")), |
| EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=6, sessionParameterRecord=b"dead")), |
| EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=7, sessionParameterRecord=b"dead")), |
| EcuResponse([EcuState(), EcuState(session=range(0,8))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=8, sessionParameterRecord=b"dead")), |
| EcuResponse([EcuState(), EcuState(session=range(8,10))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead1")), |
| EcuResponse([EcuState(), EcuState(session=range(8,10))], responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead2")), |
| EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] |
| |
| success = False |
| |
| answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) |
| sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff}) |
| sim.start() |
| try: |
| resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.negativeResponseCode == 0x10 |
| assert resp.requestServiceId == 34 |
| resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=2), timeout=1, verbose=False) |
| assert resp.service == 0x50 |
| assert resp.diagnosticSessionType == 2 |
| assert resp.sessionParameterRecord == b"dead" |
| resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.service == 0x62 |
| assert resp.dataIdentifier == 2 |
| assert resp.load == b"deadbeef1" |
| resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=4), timeout=1, verbose=False) |
| assert resp.service == 0x50 |
| assert resp.diagnosticSessionType == 4 |
| assert resp.sessionParameterRecord == b"dead" |
| resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.service == 0x62 |
| assert resp.dataIdentifier == 3 |
| assert resp.load == b"deadbeef2" |
| resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=6), timeout=1, verbose=False) |
| assert resp.service == 0x50 |
| assert resp.diagnosticSessionType == 6 |
| assert resp.sessionParameterRecord == b"dead" |
| resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.service == 0x62 |
| assert resp.dataIdentifier == 5 |
| assert resp.load == b"deadbeef3" |
| resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=8), timeout=1, verbose=False) |
| assert resp.service == 0x50 |
| assert resp.diagnosticSessionType == 8 |
| assert resp.sessionParameterRecord == b"dead" |
| resp = tester.sr1(UDS() / UDS_DSC(diagnosticSessionType=9), timeout=1, verbose=False) |
| assert resp.service == 0x50 |
| assert resp.diagnosticSessionType == 9 |
| assert resp.sessionParameterRecord == b"dead1" |
| resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.service == 0x62 |
| assert resp.dataIdentifier == 9 |
| assert resp.load == b"deadbeef4" |
| success = True |
| except Exception as ex: |
| print(ex) |
| finally: |
| tester.send(UDS(service=0xff)) |
| sim.join(timeout=10) |
| |
| assert success |
| |
| = Simple check with different Sessions and diagnosticSessionControl and answers hook |
| |
| def custom_answers(resp, req): |
| if req.service + 0x40 != resp.service: |
| return False |
| if hasattr(req, "diagnosticSessionType"): |
| if 0 < req.diagnosticSessionType <= 8: |
| resp.diagnosticSessionType = req.diagnosticSessionType |
| return resp.answers(req) |
| return False |
| |
| example_responses = \ |
| [EcuResponse(EcuState(session=2), responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")), |
| EcuResponse(EcuState(session=range(3,5)), responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")), |
| EcuResponse(EcuState(session=[5,6,7]), responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")), |
| EcuResponse(EcuState(session=[9, 10]), responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4")), |
| EcuResponse(EcuState(session=range(0,8)), responses=UDS() / UDS_DSCPR(diagnosticSessionType=1, sessionParameterRecord=b"dead"), answers=custom_answers), |
| EcuResponse(EcuState(session=range(8,10)), responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead1")), |
| EcuResponse(EcuState(session=range(8,10)), responses=UDS() / UDS_DSCPR(diagnosticSessionType=9, sessionParameterRecord=b"dead2")), |
| EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] |
| |
| success = False |
| |
| answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) |
| sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff}) |
| sim.start() |
| try: |
| resp = tester.sr1(UDS()/UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.negativeResponseCode == 0x10 |
| assert resp.requestServiceId == 34 |
| resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=2), timeout=1, verbose=False) |
| assert resp.service == 0x50 |
| assert resp.diagnosticSessionType == 2 |
| assert resp.sessionParameterRecord == b"dead" |
| resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.service == 0x62 |
| assert resp.dataIdentifier == 2 |
| assert resp.load == b"deadbeef1" |
| resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=4), timeout=1, verbose=False) |
| assert resp.service == 0x50 |
| assert resp.diagnosticSessionType == 4 |
| assert resp.sessionParameterRecord == b"dead" |
| resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.service == 0x62 |
| assert resp.dataIdentifier == 3 |
| assert resp.load == b"deadbeef2" |
| resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=6), timeout=1, verbose=False) |
| assert resp.service == 0x50 |
| assert resp.diagnosticSessionType == 6 |
| assert resp.sessionParameterRecord == b"dead" |
| resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.service == 0x62 |
| assert resp.dataIdentifier == 5 |
| assert resp.load == b"deadbeef3" |
| resp = tester.sr1(UDS()/UDS_DSC(diagnosticSessionType=8), timeout=1, verbose=False) |
| assert resp.service == 0x50 |
| assert resp.diagnosticSessionType == 8 |
| assert resp.sessionParameterRecord == b"dead" |
| resp = tester.sr1(UDS() / UDS_DSC(diagnosticSessionType=9), timeout=1, verbose=False) |
| assert resp.service == 0x50 |
| assert resp.diagnosticSessionType == 9 |
| assert resp.sessionParameterRecord == b"dead1" |
| resp = tester.sr1(UDS() / UDS_RDBI(identifiers=[2, 3, 5, 9]), timeout=1, verbose=False) |
| assert resp.service == 0x62 |
| assert resp.dataIdentifier == 9 |
| assert resp.load == b"deadbeef4" |
| success = True |
| except Exception as ex: |
| print(ex) |
| finally: |
| tester.send(UDS(service=0xff)) |
| sim.join(timeout=10) |
| |
| assert success |
| |
| = Simple check with security access and answers hook |
| |
| security_seed = b"abcd" |
| |
| def custom_answers(resp, req): |
| global security_seed |
| if req.service + 0x40 != resp.service or req.service != 0x27: |
| return False |
| if req.securityAccessType == 1: |
| resp.securitySeed = security_seed |
| return resp.answers(req) |
| elif req.securityAccessType == 2: |
| return resp.answers(req) and req.securityKey == security_seed + security_seed |
| return False |
| |
| example_responses = \ |
| [EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234"), answers=custom_answers), |
| EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers), |
| EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)), |
| EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] |
| |
| success = False |
| |
| answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) |
| sim = threading.Thread(target=answering_machine, kwargs={'timeout': 10, 'stop_filter': lambda p: p.service==0xff}) |
| sim.start() |
| try: |
| resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False) |
| assert resp.service == 0x67 |
| assert resp.securitySeed == b"abcd" |
| resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=1, verbose=False) |
| assert resp.service == 0x7f |
| assert resp.negativeResponseCode == 0x35 |
| resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False) |
| assert resp.service == 0x67 |
| assert resp.securitySeed == b"abcd" |
| resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=1, verbose=False) |
| assert resp.service == 0x67 |
| success = True |
| except Exception as ex: |
| print(ex) |
| finally: |
| tester.send(UDS(service=0xff)) |
| sim.join(timeout=10) |
| |
| assert success |
| |
| = Simple check with security access and answers hook and request-correctly-received message |
| |
| security_seed = b"abcd" |
| |
| def custom_answers(resp, req): |
| global security_seed |
| if req.service + 0x40 != resp.service or req.service != 0x27: |
| return False |
| if req.securityAccessType == 1: |
| resp.securitySeed = security_seed |
| return resp.answers(req) |
| elif req.securityAccessType == 2: |
| return resp.answers(req) and req.securityKey == security_seed + security_seed |
| return False |
| |
| example_responses = \ |
| [EcuResponse(EcuState(session=range(0,255)), responses=[UDS()/UDS_NR(negativeResponseCode=0x78, requestServiceId=0x27), UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234")], answers=custom_answers), |
| EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers), |
| EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)), |
| EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] |
| |
| success = False |
| |
| answering_machine = EcuAnsweringMachine(supported_responses=example_responses, main_socket=ecu, basecls=UDS) |
| sim = threading.Thread(target=answering_machine, kwargs={'timeout': 10, 'stop_filter': lambda p: p.service==0xff}) |
| sim.start() |
| try: |
| resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=2, verbose=False) |
| assert resp.service == 0x67 |
| assert resp.securitySeed == b"abcd" |
| resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=2, verbose=False) |
| assert resp.service == 0x7f |
| assert resp.negativeResponseCode == 0x35 |
| resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=2, verbose=False) |
| assert resp.service == 0x67 |
| assert resp.securitySeed == b"abcd" |
| resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=2, verbose=False) |
| assert resp.service == 0x67 |
| success = True |
| except Exception as ex: |
| print(ex) |
| finally: |
| tester.send(UDS(service=0xff)) |
| sim.join(timeout=10) |
| |
| assert success |
| |
| = Simple check with security access and answers hook and request-correctly-received message 2 |
| |
| security_seed = b"abcd" |
| |
| def custom_answers(resp, req): |
| global security_seed |
| if req.service + 0x40 != resp.service or req.service != 0x27: |
| return False |
| if req.securityAccessType == 1: |
| resp.securitySeed = security_seed |
| return resp.answers(req) |
| elif req.securityAccessType == 2: |
| return resp.answers(req) and req.securityKey == security_seed + security_seed |
| return False |
| |
| example_responses = \ |
| [EcuResponse(EcuState(session=range(0,255)), responses=[UDS()/UDS_NR(negativeResponseCode=0x78, requestServiceId=0x27), UDS() / UDS_SAPR(securityAccessType=1, securitySeed=b"1234")], answers=custom_answers), |
| EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_SAPR(securityAccessType=2), answers=custom_answers), |
| EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x35, requestServiceId=0x27)), |
| EcuResponse(EcuState(session=range(0,255)), responses=UDS() / UDS_NR(negativeResponseCode=0x7f, requestServiceId=0x10))] |
| |
| conf.contribs['UDS']['treat-response-pending-as-answer'] = True |
| |
| success = False |
| |
| answering_machine = EcuAnsweringMachine(supported_responses=example_responses, |
| main_socket=ecu, basecls=UDS) |
| sim = threading.Thread(target=answering_machine, kwargs={'timeout':5, 'stop_filter': lambda p: p.service==0xff}) |
| sim.start() |
| try: |
| resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False) |
| assert resp.service == 0x7f |
| assert resp.negativeResponseCode == 0x78 |
| resp = tester.sniff(timeout=2, count=1, verbose=False)[0] |
| assert resp.service == 0x67 |
| assert resp.securitySeed == b"abcd" |
| resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed), timeout=3, verbose=False) |
| assert resp.service == 0x7f |
| assert resp.negativeResponseCode == 0x35 |
| resp = tester.sr1(UDS() / UDS_SA(securityAccessType=1), timeout=1, verbose=False) |
| assert resp.service == 0x7f |
| assert resp.negativeResponseCode == 0x78 |
| resp = tester.sniff(timeout=2, count=1, verbose=False)[0] |
| assert resp.service == 0x67 |
| assert resp.securitySeed == b"abcd" |
| resp = tester.sr1(UDS() / UDS_SA(securityAccessType=2, securityKey=resp.securitySeed+resp.securitySeed), timeout=1, verbose=False) |
| assert resp.service == 0x67 |
| success = True |
| except Exception as ex: |
| print(ex) |
| finally: |
| tester.send(UDS(service=0xff)) |
| sim.join(timeout=10) |
| |
| assert success |
| |
| conf.contribs['UDS']['treat-response-pending-as-answer'] = False |
| |
| + Cleanup |
| |
| = Delete TestSockets |
| |
| cleanup_testsockets() |