| % Regression tests for Simulated ECUs and UDS Scanners |
| ~ scanner |
| |
| + Configuration |
| ~ conf |
| |
| = Imports |
| import io |
| import pickle |
| from scapy.contrib.isotp import ISOTPMessageBuilder |
| from test.testsocket import TestSocket, cleanup_testsockets, UnstableSocket |
| from scapy.automaton import ObjectPipe |
| |
| ############ |
| ############ |
| + Load general modules |
| |
| = Load contribution layer |
| |
| |
| from scapy.contrib.automotive.uds import * |
| from scapy.contrib.automotive.uds_ecu_states import * |
| from scapy.contrib.automotive.uds_scan import * |
| from scapy.contrib.automotive.ecu import * |
| |
| load_layer("can") |
| |
| conf.debug_dissector = False |
| |
| |
| = Define Testfunction |
| |
| def executeScannerInVirtualEnvironment(supported_responses, enumerators, unstable_socket=True, **kwargs): |
| tester_obj_pipe = ObjectPipe(name="TesterPipe") |
| ecu_obj_pipe = ObjectPipe(name="ECUPipe") |
| TesterSocket = UnstableSocket if unstable_socket else TestSocket |
| tester = TesterSocket(UDS, tester_obj_pipe) |
| ecu = TestSocket(UDS, ecu_obj_pipe) |
| tester.pair(ecu) |
| answering_machine = EcuAnsweringMachine( |
| supported_responses=supported_responses, main_socket=ecu, |
| basecls=UDS, verbose=False) |
| def reset(): |
| answering_machine.state.reset() |
| answering_machine.state["session"] = 1 |
| sniff(timeout=0.001, opened_socket=[ecu, tester]) |
| def reconnect(): |
| try: |
| tester.close() |
| except Exception: |
| pass |
| tester = TesterSocket(UDS, tester_obj_pipe) |
| ecu.pair(tester) |
| return tester |
| def answering_machine_thread(): |
| answering_machine( |
| timeout=120, stop_filter=lambda x: bytes(x) == b"\xff\xff\xff") |
| sim = threading.Thread(target=answering_machine_thread) |
| try: |
| sim.start() |
| scanner = UDS_Scanner( |
| tester, reset_handler=reset, reconnect_handler=reconnect, |
| test_cases=enumerators, timeout=0.1, |
| retry_if_none_received=True, unittest=True, |
| **kwargs) |
| for i in range(12): |
| print("Starting scan") |
| scanner.scan(timeout=10) |
| if scanner.scan_completed: |
| print("Scan completed after %d iterations" % i) |
| break |
| finally: |
| ecu.ins.send(Raw(b"\xff\xff\xff")) |
| sim.join(timeout=2) |
| assert not sim.is_alive() |
| cleanup_testsockets() |
| tester_obj_pipe.close() |
| ecu_obj_pipe.close() |
| if LINUX: |
| pickle_test(scanner) |
| return scanner |
| |
| def pickle_test(scanner): |
| f = io.BytesIO() |
| pickle.dump(scanner, f) |
| unp = pickle.loads(f.getvalue()) |
| assert scanner.scan_completed == unp.scan_completed |
| assert scanner.state_paths == unp.state_paths |
| |
| = Load packets from pcap |
| |
| conf.contribs['CAN']['swap-bytes'] = True |
| pkts = rdpcap(scapy_path("test/pcaps/candump_uds_scanner.pcap.gz")) |
| assert len(pkts) |
| |
| = Create UDS messages from packets |
| |
| builder = ISOTPMessageBuilder(basecls=UDS, use_ext_address=False, rx_id=[0x641, 0x651]) |
| msgs = list() |
| |
| for p in pkts: |
| if p.data == b"ECURESET": |
| msgs.append(p) |
| else: |
| builder.feed(p) |
| if len(builder): |
| msgs.append(builder.pop()) |
| |
| assert len(msgs) |
| |
| = Create ECU-Clone from packets |
| |
| mEcu = Ecu(logging=False, verbose=False, store_supported_responses=True, lookahead=3) |
| |
| for p in msgs: |
| if isinstance(p, CAN) and p.data == b"ECURESET": |
| mEcu.reset() |
| else: |
| mEcu.update(p) |
| |
| assert len(mEcu.supported_responses) |
| |
| = Test UDS_SAEnumerator evaluate_response |
| |
| e = UDS_SAEnumerator() |
| |
| config = {} |
| |
| s = EcuState(session=1) |
| |
| assert False == e._evaluate_response(s, UDS(b"\x27\x01"), None, **config) |
| config = {"exit_if_service_not_supported": True} |
| assert not e._retry_pkt[s] |
| assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x11"), **config) |
| assert not e._retry_pkt[s] |
| assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x24"), **config) |
| assert e._retry_pkt[s] == UDS(b"\x27\x01") |
| assert False == e._evaluate_response(s, UDS(b"\x27\x02"), UDS(b"\x7f\x27\x24"), **config) |
| assert not e._retry_pkt[s] |
| assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x37"), **config) |
| assert e._retry_pkt[s] == UDS(b"\x27\x01") |
| assert False == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x7f\x27\x37"), **config) |
| assert not e._retry_pkt[s] |
| assert True == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x67\x01ab"), **config) |
| assert not e._retry_pkt[s] |
| assert False == e._evaluate_response(s, UDS(b"\x27\x01"), UDS(b"\x67\x02ab"), **config) |
| assert not e._retry_pkt[s] |
| |
| |
| = Test UDS_SA_XOR_Enumerator stand alone mode |
| |
| TesterSocket = TestSocket |
| ecu_sock = TestSocket(UDS) |
| mTester = TesterSocket(UDS) |
| ecu_sock.pair(mTester) |
| answering_machine = EcuAnsweringMachine(supported_responses=mEcu.supported_responses, main_socket=ecu_sock, basecls=UDS, verbose=False) |
| sim = threading.Thread(target=answering_machine, kwargs={'timeout': 1000, "stop_filter": lambda x: bytes(x) == b"\xff\xff\xff"}) |
| sim.start() |
| try: |
| resp = mTester.sr1(UDS()/UDS_TP(b"\x00"), verbose=False, timeout=1) |
| print(repr(resp)) |
| assert resp and resp.service != 0x7f |
| resp = mTester.sr1(UDS()/UDS_DSC(diagnosticSessionType=3), verbose=False, timeout=1) |
| print(repr(resp)) |
| assert resp and resp.service != 0x7f |
| assert UDS_SA_XOR_Enumerator().get_security_access(mTester, 1) |
| finally: |
| mTester.send(Raw(b"\xff\xff\xff")) |
| sim.join(timeout=2) |
| cleanup_testsockets() |
| |
| |
| = Test configuration validation |
| |
| try: |
| scanner = UDS_Scanner(TestSocket(UDS), |
| test_cases=[UDS_SA_XOR_Enumerator, UDS_DSCEnumerator, UDS_ServiceEnumerator], |
| UDS_DSCEnumerator_kwargs={"scan_range": range(0x1000), "delay_state_change": 0, |
| "overwrite_timeout": False}) |
| assert False |
| except Scapy_Exception: |
| pass |
| |
| = Simulate ECU and run Scanner |
| |
| scanner = executeScannerInVirtualEnvironment( |
| mEcu.supported_responses, |
| [UDS_SA_XOR_Enumerator, UDS_DSCEnumerator, UDS_ServiceEnumerator], |
| UDS_DSCEnumerator_kwargs={"scan_range": range(5), "delay_state_change": 0, |
| "overwrite_timeout": False}, |
| UDS_SA_XOR_Enumerator_kwargs={"scan_range": range(5)}, |
| UDS_ServiceEnumerator_kwargs={"scan_range": [0x10, 0x11, 0x14, 0x19, 0x22, |
| 0x23, 0x24, 0x27, 0x28, 0x29, |
| 0x2A, 0x2C, 0x2E, 0x2F, 0x31, |
| 0x34, 0x35, 0x36, 0x37, 0x38, |
| 0x3D, 0x3E, 0x83, 0x84, 0x85, |
| 0x87], |
| "request_length": 1}) |
| |
| scanner.show_testcases() |
| scanner.show_testcases_status() |
| assert len(scanner.state_paths) == 5 |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| assert EcuState(session=1) in scanner.final_states |
| assert EcuState(session=2, tp=1) in scanner.final_states |
| assert EcuState(session=3, tp=1) in scanner.final_states |
| assert EcuState(session=2, tp=1, security_level=2) in scanner.final_states |
| assert EcuState(session=3, tp=1, security_level=2) in scanner.final_states |
| |
| #################### UDS_SA_XOR_Enumerator ################ |
| tc = scanner.configuration.test_cases[0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 19 |
| assert len(tc.results_with_positive_response) >= 6 |
| assert len(tc.scanned_states) == 5 |
| |
| result = tc.show(dump=True) |
| |
| assert "serviceNotSupportedInActiveSession received 5 times" in result |
| assert "incorrectMessageLengthOrInvalidFormat received 14 times" in result |
| |
| ################# UDS_DSCEnumerator ##################### |
| tc = scanner.configuration.test_cases[1] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 20 |
| assert len(tc.results_with_positive_response) == 5 |
| assert len(tc.scanned_states) == 5 |
| |
| result = tc.show(dump=True) |
| |
| assert "incorrectMessageLengthOrInvalidFormat received 20 times" in result |
| |
| ###################### UDS_ServiceEnumerator ################### |
| tc = scanner.configuration.test_cases[2] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 130 |
| assert len(tc.results_with_positive_response) == 0 |
| assert len(tc.scanned_states) == 5 |
| |
| result = tc.show(dump=True) |
| |
| assert "incorrectMessageLengthOrInvalidFormat received 34 times" in result |
| assert "serviceNotSupported received 75 times" in result |
| assert "serviceNotSupportedInActiveSession received 19 times" in result |
| assert "securityAccessDenied received 2 times" in result |
| |
| = UDS_ServiceEnumerator |
| |
| def req_handler(resp, req): |
| if req.service != 0x22: |
| return False |
| if len(req) == 1: |
| resp.negativeResponseCode="generalReject" |
| return True |
| if len(req) == 2: |
| resp.negativeResponseCode="incorrectMessageLengthOrInvalidFormat" |
| return True |
| if len(req) == 3: |
| resp.negativeResponseCode="requestOutOfRange" |
| return True |
| return False |
| |
| resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")], req_handler)] |
| |
| es = [UDS_ServiceEnumerator] |
| |
| debug_dissector_backup = conf.debug_dissector |
| |
| # This Enumerator is sending corrupted Packets, therefore we need to disable the debug_dissector |
| conf.debug_dissector = False |
| scanner = executeScannerInVirtualEnvironment( |
| resps, es, UDS_ServiceEnumerator_kwargs={"request_length": 3}, unstable_socket=False) |
| conf.debug_dissector = debug_dissector_backup |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| tc = scanner.configuration.test_cases[0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 128 * 3 |
| assert len(tc.results_with_positive_response) == 0 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "incorrectMessageLengthOrInvalidFormat" in result |
| assert "requestOutOfRange" in result |
| |
| = UDS_RDBIEnumerator |
| |
| resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] |
| |
| es = [UDS_RDBIEnumerator] |
| |
| scanner = executeScannerInVirtualEnvironment( |
| resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)}) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| tc = scanner.configuration.test_cases[0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 256 - 4 |
| assert len(tc.results_with_positive_response) == 4 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "asdfbeef1" in result |
| assert "beef2" in result |
| assert "beef3" in result |
| assert "beefff" in result |
| assert "subFunctionNotSupported received" in result |
| |
| ids = [t.req.identifiers[0] for t in tc.results_with_positive_response] |
| |
| assert 1 in ids |
| assert 2 in ids |
| assert 3 in ids |
| assert 0xff in ids |
| |
| = UDS_RDBISelectiveEnumerator |
| ~ not_pypy |
| |
| resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x101)/Raw(b"asdfbeef1")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x102)/Raw(b"beef2")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x103)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x104)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x105)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x106)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x107)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x108)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x109)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x110)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x111)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x112)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x113)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x114)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x115)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x116)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x117)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x118)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x119)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x120)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x121)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x122)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x123)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x124)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x125)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x126)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x127)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x128)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x129)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x130)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x131)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x132)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x133)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x134)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0x135)/Raw(b"beef35")]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] |
| |
| es = [UDS_RDBISelectiveEnumerator] |
| |
| scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RDBIRandomEnumerator_kwargs={"probe_start": 0, "probe_end": 0x500}) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| tc = scanner.configuration.stages[0][0] |
| |
| tc.show() |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) > 100 |
| assert len(tc.results_with_positive_response) >= 1 |
| assert len(tc.scanned_states) == 1 |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| tc = scanner.configuration.stages[0][1] |
| |
| tc.show() |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 29 |
| assert len(tc.results_with_positive_response) == 35 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "asdfbeef1" in result |
| assert "beef2" in result |
| assert "beef3" in result |
| assert "beef35" in result |
| assert "subFunctionNotSupported received" in result |
| |
| ids = [t.req.identifiers[0] for t in tc.results_with_positive_response] |
| |
| assert 0x101 in ids |
| assert 0x102 in ids |
| assert 0x103 in ids |
| assert 0x135 in ids |
| |
| = UDS_WDBIEnumerator |
| |
| def wdbi_handler(resp, req): |
| if req.service != 0x2E: |
| return False |
| assert req.dataIdentifier in [1, 2, 3, 0xff] |
| resp.dataIdentifier = req.dataIdentifier |
| if req.dataIdentifier == 1: |
| assert req.load == b'asdfbeef1' |
| return True |
| if req.dataIdentifier == 2: |
| assert req.load == b'beef2' |
| return True |
| if req.dataIdentifier == 3: |
| assert req.load == b"beef3" |
| return True |
| if req.dataIdentifier == 0xff: |
| assert req.load == b"beefff" |
| return True |
| return False |
| |
| resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), |
| EcuResponse(None, [UDS()/UDS_WDBIPR()], answers=wdbi_handler), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] |
| |
| es = [UDS_WDBISelectiveEnumerator()] |
| |
| scanner = executeScannerInVirtualEnvironment( |
| resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)}) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| tc = scanner.configuration.stages[0][0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 256 - 4 |
| assert len(tc.results_with_positive_response) == 4 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "asdfbeef1" in result |
| assert "beef2" in result |
| assert "beef3" in result |
| assert "beefff" in result |
| assert "subFunctionNotSupported received" in result |
| |
| ids = [t.req.identifiers[0] for t in tc.results_with_positive_response] |
| |
| assert 1 in ids |
| assert 2 in ids |
| assert 3 in ids |
| assert 0xff in ids |
| |
| ######################### WDBI ############################# |
| tc = scanner.configuration.stages[0][1] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 0 |
| assert len(tc.results_with_positive_response) == 4 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] |
| |
| assert 1 in ids |
| assert 2 in ids |
| assert 3 in ids |
| assert 0xff in ids |
| |
| = UDS_WDBIEnumerator 2 |
| |
| def wdbi_handler(resp, req): |
| if req.service != 0x2E: |
| return False |
| assert req.dataIdentifier in [1, 2, 3, 0xff] |
| resp.dataIdentifier = req.dataIdentifier |
| if req.dataIdentifier == 1: |
| assert req.load == b'asdfbeef1' |
| return True |
| if req.dataIdentifier == 2: |
| assert req.load == b'beef2' |
| return True |
| if req.dataIdentifier == 3: |
| assert req.load == b"beef3" |
| return True |
| if req.dataIdentifier == 0xff: |
| assert req.load == b"beefff" |
| return True |
| return False |
| |
| resps = [EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=2)/Raw(b"beef2")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=3)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RDBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), |
| EcuResponse(None, [UDS()/UDS_WDBIPR()], answers=wdbi_handler), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")])] |
| |
| es = [UDS_WDBISelectiveEnumerator] |
| |
| scanner = executeScannerInVirtualEnvironment( |
| resps, es, UDS_RDBIEnumerator_kwargs={"scan_range": range(0x100)}) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| tc = scanner.configuration.test_cases[0][0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 256 - 4 |
| assert len(tc.results_with_positive_response) == 4 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "asdfbeef1" in result |
| assert "beef2" in result |
| assert "beef3" in result |
| assert "beefff" in result |
| assert "subFunctionNotSupported received" in result |
| |
| ids = [t.req.identifiers[0] for t in tc.results_with_positive_response] |
| |
| assert 1 in ids |
| assert 2 in ids |
| assert 3 in ids |
| assert 0xff in ids |
| |
| ######################### WDBI ############################# |
| tc = scanner.configuration.test_cases[0][1] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 0 |
| assert len(tc.results_with_positive_response) == 4 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] |
| |
| assert 1 in ids |
| assert 2 in ids |
| assert 3 in ids |
| assert 0xff in ids |
| |
| |
| = UDS_TPEnumerator |
| |
| resps = [EcuResponse(None, [UDS()/UDS_TPPR()]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="serviceNotSupported", requestServiceId="TesterPresent")])] |
| |
| es = [UDS_TPEnumerator] |
| |
| scanner = executeScannerInVirtualEnvironment(resps, es) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| tc = scanner.configuration.test_cases[0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 0 |
| assert len(tc.results_with_positive_response) == 2 |
| assert len(tc.scanned_states) == 2 |
| |
| assert tc.show(dump=True) |
| |
| = UDS_EREnumerator |
| |
| resps = [EcuResponse(None, [UDS()/UDS_ERPR(resetType=1)]), |
| EcuResponse(None, [UDS()/UDS_ERPR(resetType=3)]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ECUReset")])] |
| |
| es = [UDS_EREnumerator] |
| |
| scanner = executeScannerInVirtualEnvironment(resps, es) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| tc = scanner.configuration.test_cases[0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 256 - 2 |
| assert len(tc.results_with_positive_response) == 2 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "hardReset" in result |
| assert "softReset" in result |
| assert "subFunctionNotSupported received" in result |
| |
| ids = [t.req.resetType for t in tc.results_with_positive_response] |
| |
| assert 1 in ids |
| assert 3 in ids |
| |
| |
| = UDS_CCEnumerator |
| |
| resps = [EcuResponse(None, [UDS()/UDS_CCPR(controlType=1)]), |
| EcuResponse(None, [UDS()/UDS_CCPR(controlType=3)]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="CommunicationControl")])] |
| |
| es = [UDS_CCEnumerator] |
| |
| scanner = executeScannerInVirtualEnvironment(resps, es, inter=0.001) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| tc = scanner.configuration.test_cases[0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 256 - 2 |
| assert len(tc.results_with_positive_response) == 2 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "enableRxAndDisableTx" in result |
| assert "disableRxAndTx" in result |
| assert "subFunctionNotSupported received" in result |
| |
| ids = [t.req.controlType for t in tc.results_with_positive_response] |
| |
| assert 1 in ids |
| assert 3 in ids |
| |
| = UDS_RDBPIEnumerator |
| |
| UDS_RDBPI.periodicDataIdentifiers[1] = "identifierElectric" |
| UDS_RDBPI.periodicDataIdentifiers[3] = "identifierGas" |
| |
| resps = [EcuResponse(None, [UDS()/UDS_RDBPIPR(periodicDataIdentifier=1, dataRecord=b'electric')]), |
| EcuResponse(None, [UDS()/UDS_RDBPIPR(periodicDataIdentifier=3, dataRecord=b'gas')]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataPeriodicIdentifier")])] |
| |
| es = [UDS_RDBPIEnumerator] |
| |
| scanner = executeScannerInVirtualEnvironment(resps, es) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| tc = scanner.configuration.test_cases[0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 256 - 2 |
| assert len(tc.results_with_positive_response) == 2 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "electric" in result |
| assert "gas" in result |
| assert "0x01 identifierElectric" in result |
| assert "0x03 identifierGas" in result |
| assert "subFunctionNotSupported received" in result |
| |
| ids = [t.req.periodicDataIdentifier for t in tc.results_with_positive_response] |
| |
| assert 1 in ids |
| assert 3 in ids |
| |
| |
| = UDS_RCEnumerator |
| |
| resps = [EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=1)/Raw(b"asdfbeef1")]), |
| EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=2, routineIdentifier=2)/Raw(b"beef2")]), |
| EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=3, routineIdentifier=3)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=0x10)/Raw(b"beefff")]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="RoutineControl")])] |
| |
| es = [UDS_RCEnumerator] |
| |
| scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RCEnumerator_kwargs={"scan_range": range(0x11)}) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| tc = scanner.configuration.test_cases[0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 0x11 * 3 - 4 |
| assert len(tc.results_with_positive_response) == 4 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "subFunctionNotSupported received" in result |
| |
| ids = [t.req.routineIdentifier for t in tc.results_with_positive_response] |
| |
| assert 1 in ids |
| assert 2 in ids |
| assert 3 in ids |
| assert 0x10 in ids |
| |
| |
| = UDS_RCSelectiveEnumerator |
| |
| resps = [EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=1)/Raw(b"asdfbeef1")]), |
| EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=2, routineIdentifier=1)/Raw(b"beef2")]), |
| EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=3, routineIdentifier=1)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_RCPR(routineControlType=1, routineIdentifier=0x10)/Raw(b"beefff")]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="RoutineControl")])] |
| |
| es = [UDS_RCSelectiveEnumerator] |
| |
| scanner = executeScannerInVirtualEnvironment(resps, es, UDS_RCStartEnumerator_kwargs={"scan_range": range(0x11)}) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| tc = scanner.configuration.stages[0][0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 0x11 - 2 |
| assert len(tc.results_with_positive_response) == 2 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "subFunctionNotSupported received" in result |
| |
| ids = [t.req.routineIdentifier for t in tc.results_with_positive_response] |
| |
| assert 1 in ids |
| assert 0x10 in ids |
| |
| tc = scanner.configuration.stages[0][1] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 538 |
| assert len(tc.results_with_positive_response) == 2 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "subFunctionNotSupported received" in result |
| |
| ids = [t.req.routineIdentifier for t in tc.results_with_positive_response] |
| |
| assert 1 in ids |
| |
| = UDS_IOCBIEnumerator |
| |
| resps = [EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=1)/Raw(b"asdfbeef1")]), |
| EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=2)/Raw(b"beef2")]), |
| EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=3)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/UDS_IOCBIPR(dataIdentifier=0xff)/Raw(b"beefff")]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="InputOutputControlByIdentifier")])] |
| |
| es = [UDS_IOCBIEnumerator] |
| |
| scanner = executeScannerInVirtualEnvironment(resps, es, UDS_IOCBIEnumerator_kwargs={"scan_range": range(0x100)}) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| tc = scanner.configuration.test_cases[0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 0x100 - 4 |
| assert len(tc.results_with_positive_response) == 4 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "asdfbeef1" in result |
| assert "beef2" in result |
| assert "beef3" in result |
| assert "beefff" in result |
| assert "subFunctionNotSupported received" in result |
| |
| ids = [t.req.dataIdentifier for t in tc.results_with_positive_response] |
| |
| assert 1 in ids |
| assert 2 in ids |
| assert 3 in ids |
| assert 0xff in ids |
| |
| |
| = UDS_RDEnumerator |
| |
| memory = dict() |
| |
| for addr in itertools.chain(range(0x1f00), range(0xd000, 0xfff2), range(0xa000, 0xcf00), range(0x2000, 0x5f00)): |
| memory[addr] = addr & 0xff |
| |
| def answers_rd(resp, req): |
| global memory |
| if req.service != 0x34: |
| return False |
| if req.memorySizeLen in [1, 3, 4]: |
| return False |
| if req.memoryAddressLen in [1, 3, 4]: |
| return False |
| addr = getattr(req, "memoryAddress%d" % req.memoryAddressLen) |
| if addr not in memory.keys(): |
| return False |
| resp.memorySizeLen = req.memorySizeLen |
| return True |
| |
| resps = [EcuResponse(None, [UDS()/UDS_RDPR()], answers=answers_rd), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="requestOutOfRange", requestServiceId="RequestDownload")])] |
| |
| ####################################################### |
| scanner = executeScannerInVirtualEnvironment( |
| resps, [UDS_RDEnumerator], unstable_socket=False, |
| UDS_RDEnumerator_kwargs={"unittest": True}) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| tc1 = scanner.configuration.test_cases[0] |
| |
| assert len(tc1.results_without_response) < 10 |
| if len(tc1.results_without_response): |
| tc1.show() |
| |
| assert len(tc1.results_with_negative_response) > 400 |
| assert len(tc1.results_with_positive_response) > 40 |
| assert len(tc1.scanned_states) == 1 |
| |
| result = tc1.show(dump=True) |
| |
| assert "requestOutOfRange received " in result |
| |
| = UDS_RMBARandomEnumerator |
| |
| pkt = UDS_RMBARandomEnumerator._random_memory_addr_pkt(4, 4, 10) |
| |
| assert pkt.memorySizeLen == 4 |
| assert pkt.memoryAddressLen == 4 |
| assert pkt.memorySize4 == 10 |
| assert pkt.memoryAddress4 is not None |
| |
| pkt = UDS_RMBARandomEnumerator._random_memory_addr_pkt() |
| |
| assert pkt.memorySizeLen in [1, 2, 3, 4] |
| assert pkt.memoryAddressLen in [1, 2, 3, 4] |
| |
| pkt2 = UDS_RMBARandomEnumerator._random_memory_addr_pkt() |
| |
| assert pkt != pkt2 |
| |
| |
| = UDS_RMBAEnumerator |
| ~ linux not_pypy |
| |
| memory = dict() |
| |
| mem_areas = [(0x100, 0x1f00), (0xd000, 0xff00), (0xa000, 0xc000), (0x3000, 0x5f00)] |
| |
| mem_ranges = [range(s, e) for s, e in mem_areas] |
| |
| mem_inner_borders = [s for s, _ in mem_areas] |
| mem_inner_borders += [e - 1 for _, e in mem_areas] |
| |
| mem_outer_borders = [s - 1 for s, _ in mem_areas] |
| mem_outer_borders += [e for _, e in mem_areas] |
| |
| mem_random_test_points = [] |
| for _ in range(100): |
| mem_random_test_points += [random.choice(list(itertools.chain(*mem_ranges)))] |
| |
| for addr in itertools.chain(*mem_ranges): |
| memory[addr] = addr & 0xff |
| |
| def answers_rmba(resp, req): |
| global memory |
| if req.service != 0x23: |
| return False |
| if req.memorySizeLen in [1, 3, 4]: |
| return False |
| if req.memoryAddressLen in [1, 3, 4]: |
| return False |
| addr = getattr(req, "memoryAddress%d" % req.memoryAddressLen) |
| if addr not in memory.keys(): |
| return False |
| out_mem = list() |
| size = getattr(req, "memorySize%d" % req.memorySizeLen) |
| for i in range(addr, addr + size): |
| try: |
| out_mem.append(memory[i]) |
| except KeyError: |
| pass |
| resp.dataRecord = bytes(out_mem) |
| return True |
| |
| resps = [EcuResponse(None, [UDS()/UDS_RMBAPR(dataRecord=b'')], answers=answers_rmba), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="requestOutOfRange", requestServiceId="ReadMemoryByAddress")])] |
| |
| ####################################################### |
| scanner = executeScannerInVirtualEnvironment( |
| resps, [UDS_RMBAEnumerator], unstable_socket=False, |
| UDS_RMBARandomEnumerator_kwargs={"unittest": True}) |
| |
| assert scanner.scan_completed |
| tc1 = scanner.configuration.stages[0][1] |
| |
| assert len(tc1.results_without_response) < 30 |
| if len(tc1.results_without_response): |
| tc1.show() |
| |
| assert len(tc1.results_with_negative_response) > 10 |
| assert len(tc1.results_with_positive_response) > 300 |
| assert len(tc1.scanned_states) == 1 |
| |
| result = tc1.show(dump=True) |
| |
| assert "requestOutOfRange received " in result |
| |
| ############################################################ |
| |
| addrs = tc1._get_memory_addresses_from_results(tc1.results_with_positive_response) |
| |
| print(float([tp in addrs for tp in mem_inner_borders].count(True)) / len(mem_inner_borders)) |
| assert float([tp in addrs for tp in mem_inner_borders].count(True)) / len(mem_inner_borders) > 0.8 |
| print(float([tp in addrs for tp in mem_random_test_points].count(True)) / len(mem_random_test_points)) |
| assert float([tp in addrs for tp in mem_random_test_points].count(True)) / len(mem_random_test_points) > 0.8 |
| print(float([tp not in addrs for tp in mem_outer_borders].count(True)) / len(mem_outer_borders)) |
| assert float([tp not in addrs for tp in mem_outer_borders].count(True)) / len(mem_outer_borders) > 0.7 |
| |
| |
| = UDS_TDEnumerator |
| |
| resps = [EcuResponse(None, [UDS()/UDS_TDPR(blockSequenceCounter=1)]), |
| EcuResponse(None, [UDS()/UDS_TDPR(blockSequenceCounter=3)]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="TransferData")])] |
| |
| es = [UDS_TDEnumerator] |
| |
| scanner = executeScannerInVirtualEnvironment(resps, es) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| tc = scanner.configuration.test_cases[0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 256 - 2 |
| assert len(tc.results_with_positive_response) == 2 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "subFunctionNotSupported received" in result |
| |
| ids = [t.req.blockSequenceCounter for t in tc.results_with_positive_response] |
| |
| assert 1 in ids |
| assert 3 in ids |
| |
| = BMW_DevJobEnumerator |
| |
| load_contrib("automotive.bmw.definitions") |
| load_contrib("automotive.bmw.enumerator") |
| |
| resps = [EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff00)/Raw(b"asdfbeef1")]), |
| EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff02)/Raw(b"beef2")]), |
| EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xff03)/Raw(b"beef3")]), |
| EcuResponse(None, [UDS()/DEV_JOB_PR(identifier=0xffff)/Raw(b"beefff")]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="DevelopmentJob")])] |
| |
| es = [BMW_DevJobEnumerator] |
| |
| scanner = executeScannerInVirtualEnvironment(resps, es, BMW_DevJobEnumerator_kwargs={"scan_range": range(0xFF00, 0x10000)}) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| |
| tc = scanner.configuration.test_cases[0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 0x100 - 4 |
| assert len(tc.results_with_positive_response) == 4 |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "ReadTransportMessageStatus" in result |
| assert "65282" in result |
| assert "65283" in result |
| assert "ReadMemory" in result |
| assert "subFunctionNotSupported received" in result |
| assert "PR: Supported" in result |
| |
| ids = [t.req.identifier for t in tc.results_with_positive_response] |
| |
| assert 0xff00 in ids |
| assert 0xff02 in ids |
| assert 0xff03 in ids |
| assert 0xffff in ids |
| |
| = UDS_ServiceEnumerator weird issue |
| |
| resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode=0x13, requestServiceId=0x40)]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x41)]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x11)]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x42)]), |
| EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId=0x43)])] |
| |
| es = [UDS_ServiceEnumerator] |
| |
| scanner = executeScannerInVirtualEnvironment( |
| resps, es, UDS_ServiceEnumerator_kwargs={"scan_range": [0x11, 0x40, 0x41, 0x42]}) |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| tc = scanner.configuration.test_cases[0] |
| tc.show() |
| |
| assert len(tc.results_with_negative_response) == 4 |
| |
| = UDS_ServiceEnumerator, all range |
| |
| def req_handler(resp, req): |
| if req.service != 0x22: |
| return False |
| if len(req) == 1: |
| resp.negativeResponseCode="generalReject" |
| return True |
| if len(req) == 2: |
| resp.negativeResponseCode="incorrectMessageLengthOrInvalidFormat" |
| return True |
| if len(req) == 3: |
| resp.negativeResponseCode="requestOutOfRange" |
| return True |
| return False |
| |
| resps = [EcuResponse(None, [UDS()/UDS_NR(negativeResponseCode="subFunctionNotSupported", requestServiceId="ReadDataByIdentifier")], req_handler)] |
| |
| es = [UDS_ServiceEnumerator] |
| |
| debug_dissector_backup = conf.debug_dissector |
| |
| # This Enumerator is sending corrupted Packets, therefore we need to disable the debug_dissector |
| conf.debug_dissector = False |
| scanner = executeScannerInVirtualEnvironment( |
| resps, es, UDS_ServiceEnumerator_kwargs={"request_length": 3, "scan_range": range(256)}, unstable_socket=False) |
| conf.debug_dissector = debug_dissector_backup |
| |
| assert scanner.scan_completed |
| assert scanner.progress() > 0.95 |
| tc = scanner.configuration.test_cases[0] |
| |
| assert len(tc.results_without_response) < 10 |
| if tc.results_without_response: |
| tc.show() |
| |
| tc.show() |
| |
| assert len(tc.scanned_states) == 1 |
| |
| result = tc.show(dump=True) |
| |
| assert "incorrectMessageLengthOrInvalidFormat" in result |
| assert "requestOutOfRange" in result |
| |
| + Cleanup |
| |
| = Delete testsockets |
| |
| |
| cleanup_testsockets() |